This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new fddab22 Use repartition in window functions to speed up (#569)
fddab22 is described below
commit fddab22aa562750f67385a961497dc020b18c4b2
Author: Jiayu Liu <[email protected]>
AuthorDate: Thu Jul 1 05:00:03 2021 +0800
Use repartition in window functions to speed up (#569)
* implement window functions with partition by
* fix partition requirement
---
datafusion/src/execution/context.rs | 11 ++++++
datafusion/src/physical_plan/planner.rs | 42 +++++++++++++++++++---
datafusion/src/physical_plan/windows.rs | 62 +++++++--------------------------
datafusion/src/sql/utils.rs | 24 +++++++++++++
4 files changed, 86 insertions(+), 53 deletions(-)
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 5df8e20..436bce5 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -630,6 +630,9 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the aggregate keys to execute
aggregates in parallel
/// using the provided `concurrency` level
pub repartition_aggregations: bool,
+ /// Should DataFusion repartition data using the partition keys to execute
window functions in
+ /// parallel using the provided `concurrency` level
+ pub repartition_windows: bool,
}
impl Default for ExecutionConfig {
@@ -659,6 +662,7 @@ impl Default for ExecutionConfig {
information_schema: false,
repartition_joins: true,
repartition_aggregations: true,
+ repartition_windows: true,
}
}
}
@@ -749,11 +753,18 @@ impl ExecutionConfig {
self.repartition_joins = enabled;
self
}
+
/// Enables or disables the use of repartitioning for aggregations to
improve parallelism
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
self.repartition_aggregations = enabled;
self
}
+
+ /// Enables or disables the use of repartitioning for window functions to
improve parallelism
+ pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
+ self.repartition_windows = enabled;
+ self
+ }
}
/// Holds per-execution properties and data (such as starting timestamps, etc).
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index c3bb9a8..75f1565 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -44,7 +44,7 @@ use crate::physical_plan::{
};
use crate::prelude::JoinType;
use crate::scalar::ScalarValue;
-use crate::sql::utils::generate_sort_key;
+use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
use crate::variable::VarType;
use crate::{
error::{DataFusionError, Result},
@@ -264,6 +264,38 @@ impl DefaultPhysicalPlanner {
"Impossibly got empty window expression".to_owned(),
));
}
+
+ let input_exec = self.create_initial_plan(input, ctx_state)?;
+
+ // at this moment we are guaranteed by the logical planner
+ // to have all the window_expr to have equal sort key
+ let partition_keys =
window_expr_common_partition_keys(window_expr)?;
+
+ let can_repartition = !partition_keys.is_empty()
+ && ctx_state.config.concurrency > 1
+ && ctx_state.config.repartition_windows;
+
+ let input_exec = if can_repartition {
+ let partition_keys = partition_keys
+ .iter()
+ .map(|e| {
+ self.create_physical_expr(
+ e,
+ input.schema(),
+ &input_exec.schema(),
+ ctx_state,
+ )
+ })
+ .collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
+ Arc::new(RepartitionExec::try_new(
+ input_exec,
+ Partitioning::Hash(partition_keys,
ctx_state.config.concurrency),
+ )?)
+ } else {
+ input_exec
+ };
+
+ // add a sort phase
let get_sort_keys = |expr: &Expr| match expr {
Expr::WindowFunction {
ref partition_by,
@@ -272,7 +304,6 @@ impl DefaultPhysicalPlanner {
} => generate_sort_key(partition_by, order_by),
_ => unreachable!(),
};
-
let sort_keys = get_sort_keys(&window_expr[0]);
if window_expr.len() > 1 {
debug_assert!(
@@ -283,7 +314,6 @@ impl DefaultPhysicalPlanner {
);
}
- let input_exec = self.create_initial_plan(input, ctx_state)?;
let logical_input_schema = input.schema();
let input_exec = if sort_keys.is_empty() {
@@ -310,7 +340,11 @@ impl DefaultPhysicalPlanner {
_ => unreachable!(),
})
.collect::<Result<Vec<_>>>()?;
- Arc::new(SortExec::try_new(sort_keys, input_exec)?)
+ Arc::new(if can_repartition {
+ SortExec::new_with_partitioning(sort_keys, input_exec,
true)
+ } else {
+ SortExec::try_new(sort_keys, input_exec)?
+ })
};
let physical_input_schema = input_exec.schema();
diff --git a/datafusion/src/physical_plan/windows.rs
b/datafusion/src/physical_plan/windows.rs
index 8926376..cd603fd 100644
--- a/datafusion/src/physical_plan/windows.rs
+++ b/datafusion/src/physical_plan/windows.rs
@@ -404,11 +404,22 @@ impl ExecutionPlan for WindowAggExec {
/// Get the output partitioning of this plan
fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(1)
+ // because we can have repartitioning using the partition keys
+ // this would be either 1 or more than 1 depending on the presense of
+ // repartitioning
+ self.input.output_partitioning()
}
fn required_child_distribution(&self) -> Distribution {
- Distribution::SinglePartition
+ if self
+ .window_expr()
+ .iter()
+ .all(|expr| expr.partition_by().is_empty())
+ {
+ Distribution::SinglePartition
+ } else {
+ Distribution::UnspecifiedDistribution
+ }
}
fn with_new_children(
@@ -428,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
}
async fn execute(&self, partition: usize) ->
Result<SendableRecordBatchStream> {
- if 0 != partition {
- return Err(DataFusionError::Internal(format!(
- "WindowAggExec invalid partition {}",
- partition
- )));
- }
-
- // window needs to operate on a single partition currently
- if 1 != self.input.output_partitioning().partition_count() {
- return Err(DataFusionError::Internal(
- "WindowAggExec requires a single input partition".to_owned(),
- ));
- }
-
let input = self.input.execute(partition).await?;
-
let stream = Box::pin(WindowAggStream::new(
self.schema.clone(),
self.window_expr.clone(),
@@ -581,38 +577,6 @@ mod tests {
}
#[tokio::test]
- async fn window_function_input_partition() -> Result<()> {
- let (input, schema) = create_test_schema(4)?;
-
- let window_exec = Arc::new(WindowAggExec::try_new(
- vec![create_window_expr(
- &WindowFunction::AggregateFunction(AggregateFunction::Count),
- "count".to_owned(),
- &[col("c3", &schema)?],
- &[],
- &[],
- Some(WindowFrame::default()),
- schema.as_ref(),
- )?],
- input,
- schema.clone(),
- )?);
-
- let result = collect(window_exec).await;
-
- assert!(result.is_err());
- if let Some(DataFusionError::Internal(msg)) = result.err() {
- assert_eq!(
- msg,
- "WindowAggExec requires a single input partition".to_owned()
- );
- } else {
- unreachable!("Expect an internal error to happen");
- }
- Ok(())
- }
-
- #[tokio::test]
async fn window_function() -> Result<()> {
let (input, schema) = create_test_schema(1)?;
diff --git a/datafusion/src/sql/utils.rs b/datafusion/src/sql/utils.rs
index 080f84e..2824336 100644
--- a/datafusion/src/sql/utils.rs
+++ b/datafusion/src/sql/utils.rs
@@ -462,6 +462,30 @@ pub(crate) fn generate_sort_key(
sort_key
}
+/// given a slice of window expressions sharing the same sort key, find their
common partition
+/// keys.
+pub(crate) fn window_expr_common_partition_keys(
+ window_exprs: &[Expr],
+) -> Result<&[Expr]> {
+ let all_partition_keys = window_exprs
+ .iter()
+ .map(|expr| match expr {
+ Expr::WindowFunction { partition_by, .. } => Ok(partition_by),
+ expr => Err(DataFusionError::Execution(format!(
+ "Impossibly got non-window expr {:?}",
+ expr
+ ))),
+ })
+ .collect::<Result<Vec<_>>>()?;
+ let result = all_partition_keys
+ .iter()
+ .min_by_key(|s| s.len())
+ .ok_or_else(|| {
+ DataFusionError::Execution("No window expressions
found".to_owned())
+ })?;
+ Ok(result)
+}
+
/// group a slice of window expression expr by their order by expressions
pub(crate) fn group_window_expr_by_sort_keys(
window_expr: &[Expr],