This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fddab22  Use repartition in window functions to speed up (#569)
fddab22 is described below

commit fddab22aa562750f67385a961497dc020b18c4b2
Author: Jiayu Liu <[email protected]>
AuthorDate: Thu Jul 1 05:00:03 2021 +0800

    Use repartition in window functions to speed up (#569)
    
    * implement window functions with partition by
    
    * fix partition requirement
---
 datafusion/src/execution/context.rs     | 11 ++++++
 datafusion/src/physical_plan/planner.rs | 42 +++++++++++++++++++---
 datafusion/src/physical_plan/windows.rs | 62 +++++++--------------------------
 datafusion/src/sql/utils.rs             | 24 +++++++++++++
 4 files changed, 86 insertions(+), 53 deletions(-)

diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 5df8e20..436bce5 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -630,6 +630,9 @@ pub struct ExecutionConfig {
     /// Should DataFusion repartition data using the aggregate keys to execute 
aggregates in parallel
     /// using the provided `concurrency` level
     pub repartition_aggregations: bool,
+    /// Should DataFusion repartition data using the partition keys to execute 
window functions in
+    /// parallel using the provided `concurrency` level
+    pub repartition_windows: bool,
 }
 
 impl Default for ExecutionConfig {
@@ -659,6 +662,7 @@ impl Default for ExecutionConfig {
             information_schema: false,
             repartition_joins: true,
             repartition_aggregations: true,
+            repartition_windows: true,
         }
     }
 }
@@ -749,11 +753,18 @@ impl ExecutionConfig {
         self.repartition_joins = enabled;
         self
     }
+
     /// Enables or disables the use of repartitioning for aggregations to 
improve parallelism
     pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
         self.repartition_aggregations = enabled;
         self
     }
+
+    /// Enables or disables the use of repartitioning for window functions to 
improve parallelism
+    pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
+        self.repartition_windows = enabled;
+        self
+    }
 }
 
 /// Holds per-execution properties and data (such as starting timestamps, etc).
diff --git a/datafusion/src/physical_plan/planner.rs 
b/datafusion/src/physical_plan/planner.rs
index c3bb9a8..75f1565 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -44,7 +44,7 @@ use crate::physical_plan::{
 };
 use crate::prelude::JoinType;
 use crate::scalar::ScalarValue;
-use crate::sql::utils::generate_sort_key;
+use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
 use crate::variable::VarType;
 use crate::{
     error::{DataFusionError, Result},
@@ -264,6 +264,38 @@ impl DefaultPhysicalPlanner {
                         "Impossibly got empty window expression".to_owned(),
                     ));
                 }
+
+                let input_exec = self.create_initial_plan(input, ctx_state)?;
+
+                // at this moment we are guaranteed by the logical planner
+                // to have all the window_expr to have equal sort key
+                let partition_keys = 
window_expr_common_partition_keys(window_expr)?;
+
+                let can_repartition = !partition_keys.is_empty()
+                    && ctx_state.config.concurrency > 1
+                    && ctx_state.config.repartition_windows;
+
+                let input_exec = if can_repartition {
+                    let partition_keys = partition_keys
+                        .iter()
+                        .map(|e| {
+                            self.create_physical_expr(
+                                e,
+                                input.schema(),
+                                &input_exec.schema(),
+                                ctx_state,
+                            )
+                        })
+                        .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+                    Arc::new(RepartitionExec::try_new(
+                        input_exec,
+                        Partitioning::Hash(partition_keys, 
ctx_state.config.concurrency),
+                    )?)
+                } else {
+                    input_exec
+                };
+
+                // add a sort phase
                 let get_sort_keys = |expr: &Expr| match expr {
                     Expr::WindowFunction {
                         ref partition_by,
@@ -272,7 +304,6 @@ impl DefaultPhysicalPlanner {
                     } => generate_sort_key(partition_by, order_by),
                     _ => unreachable!(),
                 };
-
                 let sort_keys = get_sort_keys(&window_expr[0]);
                 if window_expr.len() > 1 {
                     debug_assert!(
@@ -283,7 +314,6 @@ impl DefaultPhysicalPlanner {
                     );
                 }
 
-                let input_exec = self.create_initial_plan(input, ctx_state)?;
                 let logical_input_schema = input.schema();
 
                 let input_exec = if sort_keys.is_empty() {
@@ -310,7 +340,11 @@ impl DefaultPhysicalPlanner {
                             _ => unreachable!(),
                         })
                         .collect::<Result<Vec<_>>>()?;
-                    Arc::new(SortExec::try_new(sort_keys, input_exec)?)
+                    Arc::new(if can_repartition {
+                        SortExec::new_with_partitioning(sort_keys, input_exec, 
true)
+                    } else {
+                        SortExec::try_new(sort_keys, input_exec)?
+                    })
                 };
 
                 let physical_input_schema = input_exec.schema();
diff --git a/datafusion/src/physical_plan/windows.rs 
b/datafusion/src/physical_plan/windows.rs
index 8926376..cd603fd 100644
--- a/datafusion/src/physical_plan/windows.rs
+++ b/datafusion/src/physical_plan/windows.rs
@@ -404,11 +404,22 @@ impl ExecutionPlan for WindowAggExec {
 
     /// Get the output partitioning of this plan
     fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(1)
+        // because we can have repartitioning using the partition keys
+        // this would be either 1 or more than 1 depending on the presense of
+        // repartitioning
+        self.input.output_partitioning()
     }
 
     fn required_child_distribution(&self) -> Distribution {
-        Distribution::SinglePartition
+        if self
+            .window_expr()
+            .iter()
+            .all(|expr| expr.partition_by().is_empty())
+        {
+            Distribution::SinglePartition
+        } else {
+            Distribution::UnspecifiedDistribution
+        }
     }
 
     fn with_new_children(
@@ -428,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
     }
 
     async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
-        if 0 != partition {
-            return Err(DataFusionError::Internal(format!(
-                "WindowAggExec invalid partition {}",
-                partition
-            )));
-        }
-
-        // window needs to operate on a single partition currently
-        if 1 != self.input.output_partitioning().partition_count() {
-            return Err(DataFusionError::Internal(
-                "WindowAggExec requires a single input partition".to_owned(),
-            ));
-        }
-
         let input = self.input.execute(partition).await?;
-
         let stream = Box::pin(WindowAggStream::new(
             self.schema.clone(),
             self.window_expr.clone(),
@@ -581,38 +577,6 @@ mod tests {
     }
 
     #[tokio::test]
-    async fn window_function_input_partition() -> Result<()> {
-        let (input, schema) = create_test_schema(4)?;
-
-        let window_exec = Arc::new(WindowAggExec::try_new(
-            vec![create_window_expr(
-                &WindowFunction::AggregateFunction(AggregateFunction::Count),
-                "count".to_owned(),
-                &[col("c3", &schema)?],
-                &[],
-                &[],
-                Some(WindowFrame::default()),
-                schema.as_ref(),
-            )?],
-            input,
-            schema.clone(),
-        )?);
-
-        let result = collect(window_exec).await;
-
-        assert!(result.is_err());
-        if let Some(DataFusionError::Internal(msg)) = result.err() {
-            assert_eq!(
-                msg,
-                "WindowAggExec requires a single input partition".to_owned()
-            );
-        } else {
-            unreachable!("Expect an internal error to happen");
-        }
-        Ok(())
-    }
-
-    #[tokio::test]
     async fn window_function() -> Result<()> {
         let (input, schema) = create_test_schema(1)?;
 
diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs
index 080f84e..2824336 100644
--- a/datafusion/src/sql/utils.rs
+++ b/datafusion/src/sql/utils.rs
@@ -462,6 +462,30 @@ pub(crate) fn generate_sort_key(
     sort_key
 }
 
+/// given a slice of window expressions sharing the same sort key, find their 
common partition
+/// keys.
+pub(crate) fn window_expr_common_partition_keys(
+    window_exprs: &[Expr],
+) -> Result<&[Expr]> {
+    let all_partition_keys = window_exprs
+        .iter()
+        .map(|expr| match expr {
+            Expr::WindowFunction { partition_by, .. } => Ok(partition_by),
+            expr => Err(DataFusionError::Execution(format!(
+                "Impossibly got non-window expr {:?}",
+                expr
+            ))),
+        })
+        .collect::<Result<Vec<_>>>()?;
+    let result = all_partition_keys
+        .iter()
+        .min_by_key(|s| s.len())
+        .ok_or_else(|| {
+            DataFusionError::Execution("No window expressions 
found".to_owned())
+        })?;
+    Ok(result)
+}
+
 /// group a slice of window expression expr by their order by expressions
 pub(crate) fn group_window_expr_by_sort_keys(
     window_expr: &[Expr],

Reply via email to