This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 741f8f80fd Treat Partition by columns as set for window functions
(#5951)
741f8f80fd is described below
commit 741f8f80fd802b92049515035aff439247b802a4
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Apr 13 21:02:17 2023 +0300
Treat Partition by columns as set for window functions (#5951)
* Change required input ordering to format to not absolutely require
direction.
* Treat partition by columns as set
* simplifications
* simplifications
* simplifications
* simplifications
* change place of util function, bug fix get_ordered_partition_by_indices
* minor changes
* Simplifications, clone removals
* update comment
* add new test cases
* Address reviews
* address reviews
* resolve clippy errors
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/common/src/utils.rs | 55 +++-
.../src/physical_optimizer/sort_enforcement.rs | 298 +++++++++++++++------
datafusion/core/src/physical_optimizer/utils.rs | 114 +++++++-
.../windows/bounded_window_agg_exec.rs | 65 +++--
datafusion/core/src/physical_plan/windows/mod.rs | 117 ++++++--
.../src/physical_plan/windows/window_agg_exec.rs | 60 +++--
datafusion/core/src/test/mod.rs | 29 +-
.../core/tests/sqllogictests/test_files/window.slt | 109 +++++++-
datafusion/physical-expr/src/utils.rs | 109 ++++++--
9 files changed, 796 insertions(+), 160 deletions(-)
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 9b2604d2ee..2f468ff9c3 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -27,7 +27,7 @@ use sqlparser::ast::Ident;
use sqlparser::dialect::GenericDialect;
use sqlparser::parser::{Parser, ParserError};
use sqlparser::tokenizer::{Token, TokenWithLocation};
-use std::borrow::Cow;
+use std::borrow::{Borrow, Cow};
use std::cmp::Ordering;
use std::ops::Range;
@@ -292,10 +292,45 @@ pub(crate) fn parse_identifiers_normalized(s: &str) ->
Vec<String> {
.collect::<Vec<_>>()
}
+/// This function "takes" the elements at `indices` from the slice `items`.
+pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
+ items: &[T],
+ indices: impl IntoIterator<Item = I>,
+) -> Result<Vec<T>> {
+ indices
+ .into_iter()
+ .map(|idx| items.get(*idx.borrow()).cloned())
+ .collect::<Option<Vec<T>>>()
+ .ok_or_else(|| {
+ DataFusionError::Execution(
+ "Expects indices to be in the range of searched
vector".to_string(),
+ )
+ })
+}
+
+/// This function finds the longest prefix of the form 0, 1, 2, ... within the
+/// collection `sequence`. Examples:
+/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest
satisfying
+/// prefix.
+/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
+pub fn longest_consecutive_prefix<T: Borrow<usize>>(
+ sequence: impl IntoIterator<Item = T>,
+) -> usize {
+ let mut count = 0;
+ for item in sequence {
+ if !count.eq(item.borrow()) {
+ break;
+ }
+ count += 1;
+ }
+ count
+}
+
#[cfg(test)]
mod tests {
use arrow::array::Float64Array;
use arrow_array::Array;
+ use std::ops::Range;
use std::sync::Arc;
use crate::from_slice::FromSlice;
@@ -633,4 +668,22 @@ mod tests {
}
Ok(())
}
+
+ #[test]
+ fn test_get_at_indices() -> Result<()> {
+ let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
+ assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
+ assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
+ // 7 is outside the range
+ assert!(get_at_indices(&in_vec, [7]).is_err());
+ Ok(())
+ }
+
+ #[test]
+ fn test_longest_consecutive_prefix() {
+ assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
+ assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
+ assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
+ assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
+ }
}
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 76628fdcb0..71714e65d5 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -37,8 +37,9 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
use crate::physical_optimizer::utils::{
- add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
- is_sort_preserving_merge, is_union, is_window,
+ add_sort_above, find_indices, is_coalesce_partitions, is_limit,
is_repartition,
+ is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
+ merge_and_order_indices, set_difference,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -48,13 +49,14 @@ use crate::physical_plan::windows::{BoundedWindowAggExec,
WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
+use datafusion_common::utils::{get_at_indices, longest_consecutive_prefix};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::utils::{
- ordering_satisfy, ordering_satisfy_requirement_concrete,
+ convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy,
+ ordering_satisfy_requirement_concrete,
};
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement};
use itertools::{concat, izip};
-use std::iter::zip;
use std::sync::Arc;
/// This rule inspects `SortExec`'s in the given physical plan and removes the
@@ -582,40 +584,24 @@ fn analyze_window_sort_removal(
"Expects to receive either WindowAggExec of
BoundedWindowAggExec".to_string(),
));
};
- let n_req = window_exec.required_input_ordering()[0]
- .as_ref()
- .map(|elem| elem.len())
- .unwrap_or(0);
let mut needs_reverse = None;
for sort_any in sort_tree.get_leaves() {
- let sort_output_ordering = sort_any.output_ordering();
// Variable `sort_any` will either be a `SortExec` or a
// `SortPreservingMergeExec`, and both have a single child.
// Therefore, we can use the 0th index without loss of generality.
let sort_input = sort_any.children()[0].clone();
- let physical_ordering = sort_input.output_ordering();
- let sort_output_ordering = sort_output_ordering.ok_or_else(|| {
- DataFusionError::Plan("A SortExec should have output
ordering".to_string())
- })?;
- // It is enough to check whether the first "n_req" elements of the sort
- // output satisfy window_exec's requirement as it is only "n_req" long.
- let required_ordering = &sort_output_ordering[0..n_req];
- if let Some(physical_ordering) = physical_ordering {
- if let Some(should_reverse) = can_skip_sort(
- window_expr[0].partition_by(),
- required_ordering,
- &sort_input.schema(),
- physical_ordering,
- )? {
- if should_reverse ==
*needs_reverse.get_or_insert(should_reverse) {
- continue;
- }
+ if let Some(should_reverse) = can_skip_sort(
+ window_expr[0].partition_by(),
+ window_expr[0].order_by(),
+ &sort_input,
+ )? {
+ if should_reverse == *needs_reverse.get_or_insert(should_reverse) {
+ continue;
}
}
- // If there is no physical ordering, or we can not skip the sort, or
- // window reversal requirements are not uniform; then there is no
- // opportunity for a sort removal -- we immediately return.
+ // We can not skip the sort, or window reversal requirements are not
+ // uniform; then sort removal is not possible -- we immediately return.
return Ok(None);
}
let new_window_expr = if needs_reverse.unwrap() {
@@ -776,67 +762,108 @@ fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) ->
Result<&[PhysicalSortExp
}
}
-#[derive(Debug)]
-/// This structure stores extra column information required to remove
unnecessary sorts.
-pub struct ColumnInfo {
- reverse: bool,
- is_partition: bool,
-}
-
/// Compares physical ordering and required ordering of all `PhysicalSortExpr`s
/// to decide whether a `SortExec` before a `WindowAggExec` can be removed.
/// A `None` return value indicates that we can remove the sort in question.
/// A `Some(bool)` value indicates otherwise, and signals whether we need to
/// reverse the ordering in order to remove the sort in question.
-pub fn can_skip_sort(
- partition_keys: &[Arc<dyn PhysicalExpr>],
- required: &[PhysicalSortExpr],
- input_schema: &SchemaRef,
- physical_ordering: &[PhysicalSortExpr],
+fn can_skip_sort(
+ partitionby_exprs: &[Arc<dyn PhysicalExpr>],
+ orderby_keys: &[PhysicalSortExpr],
+ input: &Arc<dyn ExecutionPlan>,
) -> Result<Option<bool>> {
- if required.len() > physical_ordering.len() {
+ let physical_ordering = if let Some(physical_ordering) =
input.output_ordering() {
+ physical_ordering
+ } else {
+ // If there is no physical ordering, there is no way to remove a
+ // sort, so immediately return.
+ return Ok(None);
+ };
+ let orderby_exprs = convert_to_expr(orderby_keys);
+ let physical_ordering_exprs = convert_to_expr(physical_ordering);
+ let equal_properties = || input.equivalence_properties();
+ // indices of the order by expressions among input ordering expressions
+ let ob_indices = get_indices_of_matching_exprs(
+ &orderby_exprs,
+ &physical_ordering_exprs,
+ equal_properties,
+ );
+ let contains_all_orderbys = ob_indices.len() == orderby_exprs.len();
+ if !contains_all_orderbys {
+ // If all order by expressions are not in the input ordering. There is
no way to remove a sort
+ // immediately return
return Ok(None);
}
- let mut col_infos = vec![];
- for (sort_expr, physical_expr) in zip(required, physical_ordering) {
- let column = sort_expr.expr.clone();
- let is_partition = partition_keys.iter().any(|e| e.eq(&column));
- if let Some(reverse) = check_alignment(input_schema, physical_expr,
sort_expr)? {
- col_infos.push(ColumnInfo {
- reverse,
- is_partition,
- });
- } else {
- return Ok(None);
- }
+ // indices of the partition by expressions among input ordering expressions
+ let mut pb_indices = get_indices_of_matching_exprs(
+ partitionby_exprs,
+ &physical_ordering_exprs,
+ equal_properties,
+ );
+ let ordered_merged_indices = merge_and_order_indices(&pb_indices,
&ob_indices);
+ // Indices of order by columns that doesn't seen in partition by
+ // Equivalently (Order by columns) ∖ (Partition by columns) where `∖`
represents set difference.
+ let unique_ob_indices = set_difference(&ob_indices, &pb_indices);
+ if !is_sorted(&unique_ob_indices) {
+ // ORDER BY indices should be ascending ordered
+ return Ok(None);
}
- let partition_by_sections = col_infos
- .iter()
- .filter(|c| c.is_partition)
- .collect::<Vec<_>>();
- let can_skip_partition_bys = if partition_by_sections.is_empty() {
- true
+ let first_n = longest_consecutive_prefix(ordered_merged_indices);
+ let furthest_ob_index = *unique_ob_indices.last().unwrap_or(&0);
+ // Cannot skip sort if last order by index is not within consecutive
prefix.
+ // For instance, if input is ordered by a,b,c,d
+ // for expression `PARTITION BY a, ORDER BY b, d`, `first_n` would be 2
(meaning a, b defines a prefix for input ordering)
+ // Whereas `furthest_ob_index` would be 3 (column d occurs at the 3rd
index of the existing ordering.)
+ // Hence existing ordering is not sufficient to run current Executor.
+ // However, for expression `PARTITION BY a, ORDER BY b, c, d`, `first_n`
would be 4 (meaning a, b, c, d defines a prefix for input ordering)
+ // Similarly, `furthest_ob_index` would be 3 (column d occurs at the 3rd
index of the existing ordering.)
+ // Hence existing ordering would be sufficient to run current Executor.
+ if first_n <= furthest_ob_index {
+ return Ok(None);
+ }
+ let input_orderby_columns = get_at_indices(physical_ordering,
&unique_ob_indices)?;
+ let expected_orderby_columns =
+ get_at_indices(orderby_keys, find_indices(&ob_indices,
unique_ob_indices)?)?;
+ let should_reverse = if let Some(should_reverse) = check_alignments(
+ &input.schema(),
+ &input_orderby_columns,
+ &expected_orderby_columns,
+ )? {
+ should_reverse
} else {
- let first_reverse = partition_by_sections[0].reverse;
- let can_skip_partition_bys = partition_by_sections
- .iter()
- .all(|c| c.reverse == first_reverse);
- can_skip_partition_bys
+ // If ordering directions are not aligned. We cannot calculate result
without changing existing ordering.
+ return Ok(None);
};
- let order_by_sections = col_infos
- .iter()
- .filter(|elem| !elem.is_partition)
- .collect::<Vec<_>>();
- let (can_skip_order_bys, should_reverse_order_bys) = if
order_by_sections.is_empty() {
- (true, false)
+
+ // Determine how many elements in the partition by columns defines a
consecutive range from zero.
+ pb_indices.sort();
+ let first_n = longest_consecutive_prefix(pb_indices);
+ if first_n < partitionby_exprs.len() {
+ return Ok(None);
+ }
+ Ok(Some(should_reverse))
+}
+
+fn check_alignments(
+ schema: &SchemaRef,
+ physical_ordering: &[PhysicalSortExpr],
+ required: &[PhysicalSortExpr],
+) -> Result<Option<bool>> {
+ let res = izip!(physical_ordering, required)
+ .map(|(lhs, rhs)| check_alignment(schema, lhs, rhs))
+ .collect::<Result<Option<Vec<_>>>>()?;
+ Ok(if let Some(res) = res {
+ if !res.is_empty() {
+ let first = res[0];
+ let all_same = res.into_iter().all(|elem| elem == first);
+ all_same.then_some(first)
+ } else {
+ Some(false)
+ }
} else {
- let first_reverse = order_by_sections[0].reverse;
- let can_skip_order_bys =
- order_by_sections.iter().all(|c| c.reverse == first_reverse);
- (can_skip_order_bys, first_reverse)
- };
- let can_skip = can_skip_order_bys && can_skip_partition_bys;
- Ok(can_skip.then_some(should_reverse_order_bys))
+ // Cannot skip some of the requirements in the input.
+ None
+ })
}
/// Compares `physical_ordering` and `required` ordering, decides whether
@@ -883,6 +910,7 @@ mod tests {
use crate::physical_plan::windows::create_window_expr;
use crate::physical_plan::{displayable, Partitioning};
use crate::prelude::SessionContext;
+ use crate::test::csv_exec_sorted;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{Result, Statistics};
@@ -909,6 +937,17 @@ mod tests {
Ok(schema)
}
+ // Generate a schema which consists of 5 columns (a, b, c, d, e)
+ fn create_test_schema3() -> Result<SchemaRef> {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, false);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, false);
+ let e = Field::new("e", DataType::Int32, false);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+ Ok(schema)
+ }
+
// Util function to get string representation of a physical plan
fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent().to_string();
@@ -993,6 +1032,109 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_can_skip_ordering_exhaustive() -> Result<()> {
+ let test_schema = create_test_schema3()?;
+ // Columns a,c are nullable whereas b,d are not nullable.
+ // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC
NULLS FIRST, d ASC NULLS FIRST
+ // Column e is not ordered.
+ let sort_exprs = vec![
+ sort_expr("a", &test_schema),
+ sort_expr("b", &test_schema),
+ sort_expr("c", &test_schema),
+ sort_expr("d", &test_schema),
+ ];
+ let exec_unbounded = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+ // test cases consists of vector of tuples. Where each tuple
represents a single test case.
+ // First field in the tuple is Vec<str> where each element in the
vector represents PARTITION BY columns
+ // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
+ // Second field in the tuple is Vec<str> where each element in the
vector represents ORDER BY columns
+ // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST,
(ordering is default ordering. We do not check
+ // for reversibility in this test).
+ // Third field in the tuple is Option<bool>, which corresponds to
expected result for `can_skip_sort` function.
+ // None represents that existing ordering is not sufficient to run
executor.
+ // (We need to add SortExec to be able to run it).
+ // Some(bool) represents, we can run algorithm with existing ordering.
If bool is `true`. We should reverse, window expressions
+ // if bool is `false`. Existing window expression can be used as is.
+ let test_cases = vec![
+ (vec!["a"], vec!["a"], Some(false)),
+ (vec!["a"], vec!["b"], Some(false)),
+ (vec!["a"], vec!["c"], None),
+ (vec!["a"], vec!["a", "b"], Some(false)),
+ (vec!["a"], vec!["b", "c"], Some(false)),
+ (vec!["a"], vec!["a", "c"], None),
+ (vec!["a"], vec!["a", "b", "c"], Some(false)),
+ (vec!["b"], vec!["a"], None),
+ (vec!["b"], vec!["b"], None),
+ (vec!["b"], vec!["c"], None),
+ (vec!["b"], vec!["a", "b"], None),
+ (vec!["b"], vec!["b", "c"], None),
+ (vec!["b"], vec!["a", "c"], None),
+ (vec!["b"], vec!["a", "b", "c"], None),
+ (vec!["c"], vec!["a"], None),
+ (vec!["c"], vec!["b"], None),
+ (vec!["c"], vec!["c"], None),
+ (vec!["c"], vec!["a", "b"], None),
+ (vec!["c"], vec!["b", "c"], None),
+ (vec!["c"], vec!["a", "c"], None),
+ (vec!["c"], vec!["a", "b", "c"], None),
+ (vec!["b", "a"], vec!["a"], Some(false)),
+ (vec!["b", "a"], vec!["b"], Some(false)),
+ (vec!["b", "a"], vec!["c"], Some(false)),
+ (vec!["b", "a"], vec!["a", "b"], Some(false)),
+ (vec!["b", "a"], vec!["b", "c"], Some(false)),
+ (vec!["b", "a"], vec!["a", "c"], Some(false)),
+ (vec!["b", "a"], vec!["a", "b", "c"], Some(false)),
+ (vec!["c", "b"], vec!["a"], None),
+ (vec!["c", "b"], vec!["b"], None),
+ (vec!["c", "b"], vec!["c"], None),
+ (vec!["c", "b"], vec!["a", "b"], None),
+ (vec!["c", "b"], vec!["b", "c"], None),
+ (vec!["c", "b"], vec!["a", "c"], None),
+ (vec!["c", "b"], vec!["a", "b", "c"], None),
+ (vec!["c", "a"], vec!["a"], None),
+ (vec!["c", "a"], vec!["b"], None),
+ (vec!["c", "a"], vec!["c"], None),
+ (vec!["c", "a"], vec!["a", "b"], None),
+ (vec!["c", "a"], vec!["b", "c"], None),
+ (vec!["c", "a"], vec!["a", "c"], None),
+ (vec!["c", "a"], vec!["a", "b", "c"], None),
+ (vec!["c", "b", "a"], vec!["a"], Some(false)),
+ (vec!["c", "b", "a"], vec!["b"], Some(false)),
+ (vec!["c", "b", "a"], vec!["c"], Some(false)),
+ (vec!["c", "b", "a"], vec!["a", "b"], Some(false)),
+ (vec!["c", "b", "a"], vec!["b", "c"], Some(false)),
+ (vec!["c", "b", "a"], vec!["a", "c"], Some(false)),
+ (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(false)),
+ ];
+ for (case_idx, test_case) in test_cases.iter().enumerate() {
+ let (partition_by_columns, order_by_params, expected) = &test_case;
+ let mut partition_by_exprs = vec![];
+ for col_name in partition_by_columns {
+ partition_by_exprs.push(col(col_name, &test_schema)?);
+ }
+
+ let mut order_by_exprs = vec![];
+ for col_name in order_by_params {
+ let expr = col(col_name, &test_schema)?;
+ // Give default ordering, this is same with input ordering
direction
+ // In this test we do check for reversibility.
+ let options = SortOptions::default();
+ order_by_exprs.push(PhysicalSortExpr { expr, options });
+ }
+ let res =
+ can_skip_sort(&partition_by_exprs, &order_by_exprs,
&exec_unbounded)?;
+ assert_eq!(
+ res, *expected,
+ "Unexpected result for in unbounded test case#: {:?}, case:
{:?}",
+ case_idx, test_case
+ );
+ }
+
+ Ok(())
+ }
+
/// Runs the sort enforcement optimizer and asserts the plan
/// against the original and expected plans
///
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 06bef0fbda..b20ff556c8 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -19,6 +19,10 @@
use super::optimizer::PhysicalOptimizerRule;
+use std::borrow::Borrow;
+use std::collections::HashSet;
+use std::sync::Arc;
+
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -30,9 +34,9 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_common::tree_node::Transformed;
+use datafusion_common::DataFusionError;
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
-use std::sync::Arc;
/// Convenience rule for writing optimizers: recursively invoke
/// optimize on plan's children and then return a node of the same
@@ -77,6 +81,64 @@ pub fn add_sort_above(
Ok(())
}
+/// Find indices of each element in `targets` inside `items`. If one of the
+/// elements is absent in `items`, returns an error.
+pub fn find_indices<T: PartialEq, S: Borrow<T>>(
+ items: &[T],
+ targets: impl IntoIterator<Item = S>,
+) -> Result<Vec<usize>> {
+ targets
+ .into_iter()
+ .map(|target| items.iter().position(|e| target.borrow().eq(e)))
+ .collect::<Option<_>>()
+ .ok_or_else(|| DataFusionError::Execution("Target not
found".to_string()))
+}
+
+/// Merges collections `first` and `second`, removes duplicates and sorts the
+/// result, returning it as a [`Vec`].
+pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
+ first: impl IntoIterator<Item = T>,
+ second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+ let mut result: Vec<_> = first
+ .into_iter()
+ .map(|e| *e.borrow())
+ .chain(second.into_iter().map(|e| *e.borrow()))
+ .collect::<HashSet<_>>()
+ .into_iter()
+ .collect();
+ result.sort();
+ result
+}
+
+/// Checks whether the given index sequence is monotonically non-decreasing.
+pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) ->
bool {
+ // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
+ let mut previous = 0;
+ for item in sequence.into_iter() {
+ let current = *item.borrow();
+ if current < previous {
+ return false;
+ }
+ previous = current;
+ }
+ true
+}
+
+/// Calculates the set difference between sequences `first` and `second`,
+/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
+pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
+ first: impl IntoIterator<Item = T>,
+ second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+ let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
+ first
+ .into_iter()
+ .map(|e| *e.borrow())
+ .filter(|e| !set.contains(e))
+ .collect()
+}
+
/// Checks whether the given operator is a limit;
/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
@@ -113,3 +175,53 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ async fn test_find_indices() -> Result<()> {
+ assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+ assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+ assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+ assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+ assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_merge_and_order_indices() {
+ assert_eq!(
+ merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+ vec![0, 1, 3, 4, 5]
+ );
+ // Result should be ordered, even if inputs are not
+ assert_eq!(
+ merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+ vec![0, 1, 3, 4, 5]
+ );
+ }
+
+ #[tokio::test]
+ async fn test_is_sorted() {
+ assert!(is_sorted::<usize>([]));
+ assert!(is_sorted([0]));
+ assert!(is_sorted([0, 3, 4]));
+ assert!(is_sorted([0, 1, 2]));
+ assert!(is_sorted([0, 1, 4]));
+ assert!(is_sorted([0usize; 0]));
+ assert!(is_sorted([1, 2]));
+ assert!(!is_sorted([3, 2]));
+ }
+
+ #[tokio::test]
+ async fn test_set_difference() {
+ assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
+ assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
+ // return value should have same ordering with the in1
+ assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
+ assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
+ assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
+ }
+}
diff --git
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index cc35541fc6..a8d02bcd42 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -46,8 +46,10 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::physical_plan::windows::calc_requirements;
-use datafusion_common::utils::evaluate_partition_ranges;
+use crate::physical_plan::windows::{
+ calc_requirements, get_ordered_partition_by_indices,
+};
+use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey,
PartitionWindowAggStates,
WindowAggState, WindowState,
@@ -73,6 +75,9 @@ pub struct BoundedWindowAggExec {
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ /// Partition by indices that defines preset for existing ordering
+ // see `get_ordered_partition_by_indices` for more details.
+ ordered_partition_by_indices: Vec<usize>,
}
impl BoundedWindowAggExec {
@@ -85,6 +90,9 @@ impl BoundedWindowAggExec {
) -> Result<Self> {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
+
+ let ordered_partition_by_indices =
+ get_ordered_partition_by_indices(window_expr[0].partition_by(),
&input);
Ok(Self {
input,
window_expr,
@@ -92,6 +100,7 @@ impl BoundedWindowAggExec {
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
+ ordered_partition_by_indices,
})
}
@@ -116,20 +125,9 @@ impl BoundedWindowAggExec {
// Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY`
columns can be used safely
// to calculate partition separation points
pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
- let mut result = vec![];
- // All window exprs have the same partition by, so we just use the
first one:
- let partition_by = self.window_expr()[0].partition_by();
+ // Partition by sort keys indices are stored in
self.ordered_partition_by_indices.
let sort_keys = self.input.output_ordering().unwrap_or(&[]);
- for item in partition_by {
- if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
- result.push(a.clone());
- } else {
- return Err(DataFusionError::Internal(
- "Partition key not found in sort keys".to_string(),
- ));
- }
- }
- Ok(result)
+ get_at_indices(sort_keys, &self.ordered_partition_by_indices)
}
}
@@ -166,8 +164,15 @@ impl ExecutionPlan for BoundedWindowAggExec {
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
let partition_bys = self.window_expr()[0].partition_by();
let order_keys = self.window_expr()[0].order_by();
- let requirements = calc_requirements(partition_bys, order_keys);
- vec![requirements]
+ if self.ordered_partition_by_indices.len() < partition_bys.len() {
+ vec![calc_requirements(partition_bys, order_keys)]
+ } else {
+ let partition_bys = self
+ .ordered_partition_by_indices
+ .iter()
+ .map(|idx| &partition_bys[*idx]);
+ vec![calc_requirements(partition_bys, order_keys)]
+ }
}
fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -211,7 +216,8 @@ impl ExecutionPlan for BoundedWindowAggExec {
input,
BaselineMetrics::new(&self.metrics, partition),
self.partition_by_sort_keys()?,
- ));
+ self.ordered_partition_by_indices.clone(),
+ )?);
Ok(stream)
}
@@ -319,6 +325,7 @@ pub struct SortedPartitionByBoundedWindowStream {
window_expr: Vec<Arc<dyn WindowExpr>>,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
baseline_metrics: BaselineMetrics,
+ ordered_partition_by_indices: Vec<usize>,
}
impl PartitionByHandler for SortedPartitionByBoundedWindowStream {
@@ -429,10 +436,19 @@ impl SortedPartitionByBoundedWindowStream {
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
- ) -> Self {
+ ordered_partition_by_indices: Vec<usize>,
+ ) -> Result<Self> {
let state = window_expr.iter().map(|_| IndexMap::new()).collect();
let empty_batch = RecordBatch::new_empty(schema.clone());
- Self {
+
+ // In BoundedWindowAggExec all partition by columns should be ordered.
+ if window_expr[0].partition_by().len() !=
ordered_partition_by_indices.len() {
+ return Err(DataFusionError::Internal(
+ "All partition by columns should have an ordering".to_string(),
+ ));
+ }
+
+ Ok(Self {
schema,
input,
input_buffer: empty_batch,
@@ -442,7 +458,8 @@ impl SortedPartitionByBoundedWindowStream {
window_expr,
baseline_metrics,
partition_by_sort_keys,
- }
+ ordered_partition_by_indices,
+ })
}
fn compute_aggregates(&mut self) -> Result<RecordBatch> {
@@ -621,10 +638,10 @@ impl SortedPartitionByBoundedWindowStream {
/// Get Partition Columns
pub fn partition_columns(&self, batch: &RecordBatch) ->
Result<Vec<SortColumn>> {
- self.partition_by_sort_keys
+ self.ordered_partition_by_indices
.iter()
- .map(|e| e.evaluate_to_sort_column(batch))
- .collect::<Result<Vec<_>>>()
+ .map(|idx|
self.partition_by_sort_keys[*idx].evaluate_to_sort_column(batch))
+ .collect()
}
}
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs
b/datafusion/core/src/physical_plan/windows/mod.rs
index d418e4b3d5..eea0749e94 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -25,7 +25,7 @@ use crate::physical_plan::{
PhysicalSortExpr, RowNumber,
},
type_coercion::coerce,
- udaf, PhysicalExpr,
+ udaf, ExecutionPlan, PhysicalExpr,
};
use crate::scalar::ScalarValue;
use arrow::datatypes::Schema;
@@ -36,6 +36,7 @@ use datafusion_expr::{
use datafusion_physical_expr::window::{
BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr,
};
+use std::borrow::Borrow;
use std::convert::TryInto;
use std::sync::Arc;
@@ -43,6 +44,8 @@ mod bounded_window_agg_exec;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
+use datafusion_common::utils::longest_consecutive_prefix;
+use datafusion_physical_expr::utils::{convert_to_expr,
get_indices_of_matching_exprs};
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
@@ -188,24 +191,56 @@ fn create_built_in_window_expr(
})
}
-pub(crate) fn calc_requirements(
- partition_by_exprs: &[Arc<dyn PhysicalExpr>],
- orderby_sort_exprs: &[PhysicalSortExpr],
+pub(crate) fn calc_requirements<
+ T: Borrow<Arc<dyn PhysicalExpr>>,
+ S: Borrow<PhysicalSortExpr>,
+>(
+ partition_by_exprs: impl IntoIterator<Item = T>,
+ orderby_sort_exprs: impl IntoIterator<Item = S>,
) -> Option<Vec<PhysicalSortRequirement>> {
- let mut sort_reqs = vec![];
- for partition_by in partition_by_exprs {
- sort_reqs.push(PhysicalSortRequirement::new(partition_by.clone(),
None))
- }
- for sort_expr in orderby_sort_exprs {
- let contains = sort_reqs.iter().any(|e| sort_expr.expr.eq(e.expr()));
- if !contains {
- sort_reqs.push(PhysicalSortRequirement::from(sort_expr.clone()));
+ let mut sort_reqs = partition_by_exprs
+ .into_iter()
+ .map(|partition_by| {
+ PhysicalSortRequirement::new(partition_by.borrow().clone(), None)
+ })
+ .collect::<Vec<_>>();
+ for element in orderby_sort_exprs.into_iter() {
+ let PhysicalSortExpr { expr, options } = element.borrow();
+ if !sort_reqs.iter().any(|e| e.expr().eq(expr)) {
+ sort_reqs.push(PhysicalSortRequirement::new(expr.clone(),
Some(*options)));
}
}
// Convert empty result to None. Otherwise wrap result inside Some()
(!sort_reqs.is_empty()).then_some(sort_reqs)
}
+/// This function calculates the indices such that when partition by
expressions reordered with this indices
+/// resulting expressions define a preset for existing ordering.
+// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used
+// This vector will be [1, 0]. It means that when we iterate b,a columns with
the order [1, 0]
+// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
+pub(crate) fn get_ordered_partition_by_indices(
+ partition_by_exprs: &[Arc<dyn PhysicalExpr>],
+ input: &Arc<dyn ExecutionPlan>,
+) -> Vec<usize> {
+ let input_ordering = input.output_ordering().unwrap_or(&[]);
+ let input_ordering_exprs = convert_to_expr(input_ordering);
+ let equal_properties = || input.equivalence_properties();
+ let input_places = get_indices_of_matching_exprs(
+ &input_ordering_exprs,
+ partition_by_exprs,
+ equal_properties,
+ );
+ let mut partition_places = get_indices_of_matching_exprs(
+ partition_by_exprs,
+ &input_ordering_exprs,
+ equal_properties,
+ );
+ partition_places.sort();
+ let first_n = longest_consecutive_prefix(partition_places);
+ input_places[0..first_n].to_vec()
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -215,7 +250,7 @@ mod tests {
use crate::physical_plan::{collect, ExecutionPlan};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
- use crate::test::{self, assert_is_pending};
+ use crate::test::{self, assert_is_pending, csv_exec_sorted};
use arrow::array::*;
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaRef};
@@ -235,10 +270,62 @@ mod tests {
let b = Field::new("b", DataType::Int32, true);
let c = Field::new("c", DataType::Int32, true);
let d = Field::new("d", DataType::Int32, true);
- let schema = Arc::new(Schema::new(vec![a, b, c, d]));
+ let e = Field::new("e", DataType::Int32, true);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
Ok(schema)
}
+ /// make PhysicalSortExpr with default options
+ fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+ sort_expr_options(name, schema, SortOptions::default())
+ }
+
+ /// PhysicalSortExpr with specified options
+ fn sort_expr_options(
+ name: &str,
+ schema: &Schema,
+ options: SortOptions,
+ ) -> PhysicalSortExpr {
+ PhysicalSortExpr {
+ expr: col(name, schema).unwrap(),
+ options,
+ }
+ }
+
+ #[tokio::test]
+ async fn test_get_partition_by_ordering() -> Result<()> {
+ let test_schema = create_test_schema2()?;
+ // Columns a,c are nullable whereas b,d are not nullable.
+ // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC
NULLS FIRST, d ASC NULLS FIRST
+ // Column e is not ordered.
+ let sort_exprs = vec![
+ sort_expr("a", &test_schema),
+ sort_expr("b", &test_schema),
+ sort_expr("c", &test_schema),
+ sort_expr("d", &test_schema),
+ ];
+ // Input is ordered by a,b,c,d
+ let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+ let test_data = vec![
+ (vec!["a", "b"], vec![0, 1]),
+ (vec!["b", "a"], vec![1, 0]),
+ (vec!["b", "a", "c"], vec![1, 0, 2]),
+ (vec!["d", "b", "a"], vec![2, 1]),
+ (vec!["d", "e", "a"], vec![2]),
+ ];
+ for (pb_names, expected) in test_data {
+ let pb_exprs = pb_names
+ .iter()
+ .map(|name| col(name, &test_schema))
+ .collect::<Result<Vec<_>>>()?;
+ assert_eq!(
+ get_ordered_partition_by_indices(&pb_exprs, &input),
+ expected
+ );
+ }
+ Ok(())
+ }
+
#[tokio::test]
async fn test_calc_requirements() -> Result<()> {
let schema = create_test_schema2()?;
@@ -298,7 +385,7 @@ mod tests {
expected = Some(vec![res]);
}
}
- assert_eq!(calc_requirements(&partitionbys, &orderbys), expected);
+ assert_eq!(calc_requirements(partitionbys, orderbys), expected);
}
Ok(())
}
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 520784fb9a..bda1b52ff8 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,7 +24,9 @@ use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::metrics::{
BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
};
-use crate::physical_plan::windows::calc_requirements;
+use crate::physical_plan::windows::{
+ calc_requirements, get_ordered_partition_by_indices,
+};
use crate::physical_plan::{
ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
@@ -38,7 +40,7 @@ use arrow::{
datatypes::{Schema, SchemaRef},
record_batch::RecordBatch,
};
-use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::PhysicalSortRequirement;
use futures::stream::Stream;
@@ -63,6 +65,9 @@ pub struct WindowAggExec {
pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
+ /// Partition by indices that defines preset for existing ordering
+ // see `get_ordered_partition_by_indices` for more details.
+ ordered_partition_by_indices: Vec<usize>,
}
impl WindowAggExec {
@@ -76,6 +81,8 @@ impl WindowAggExec {
let schema = create_schema(&input_schema, &window_expr)?;
let schema = Arc::new(schema);
+ let ordered_partition_by_indices =
+ get_ordered_partition_by_indices(window_expr[0].partition_by(),
&input);
Ok(Self {
input,
window_expr,
@@ -83,6 +90,7 @@ impl WindowAggExec {
input_schema,
partition_keys,
metrics: ExecutionPlanMetricsSet::new(),
+ ordered_partition_by_indices,
})
}
@@ -107,20 +115,9 @@ impl WindowAggExec {
// Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY`
columns can be used safely
// to calculate partition separation points
pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
- let mut result = vec![];
- // All window exprs have the same partition by, so we just use the
first one:
- let partition_by = self.window_expr()[0].partition_by();
+ // Partition by sort keys indices are stored in
self.ordered_partition_by_indices.
let sort_keys = self.input.output_ordering().unwrap_or(&[]);
- for item in partition_by {
- if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
- result.push(a.clone());
- } else {
- return Err(DataFusionError::Execution(
- "Partition key not found in sort keys".to_string(),
- ));
- }
- }
- Ok(result)
+ get_at_indices(sort_keys, &self.ordered_partition_by_indices)
}
}
@@ -171,8 +168,15 @@ impl ExecutionPlan for WindowAggExec {
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
let partition_bys = self.window_expr()[0].partition_by();
let order_keys = self.window_expr()[0].order_by();
- let requirements = calc_requirements(partition_bys, order_keys);
- vec![requirements]
+ if self.ordered_partition_by_indices.len() < partition_bys.len() {
+ vec![calc_requirements(partition_bys, order_keys)]
+ } else {
+ let partition_bys = self
+ .ordered_partition_by_indices
+ .iter()
+ .map(|idx| &partition_bys[*idx]);
+ vec![calc_requirements(partition_bys, order_keys)]
+ }
}
fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -211,7 +215,8 @@ impl ExecutionPlan for WindowAggExec {
input,
BaselineMetrics::new(&self.metrics, partition),
self.partition_by_sort_keys()?,
- ));
+ self.ordered_partition_by_indices.clone(),
+ )?);
Ok(stream)
}
@@ -300,6 +305,7 @@ pub struct WindowAggStream {
window_expr: Vec<Arc<dyn WindowExpr>>,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
baseline_metrics: BaselineMetrics,
+ ordered_partition_by_indices: Vec<usize>,
}
impl WindowAggStream {
@@ -310,8 +316,15 @@ impl WindowAggStream {
input: SendableRecordBatchStream,
baseline_metrics: BaselineMetrics,
partition_by_sort_keys: Vec<PhysicalSortExpr>,
- ) -> Self {
- Self {
+ ordered_partition_by_indices: Vec<usize>,
+ ) -> Result<Self> {
+ // In WindowAggExec all partition by columns should be ordered.
+ if window_expr[0].partition_by().len() !=
ordered_partition_by_indices.len() {
+ return Err(DataFusionError::Internal(
+ "All partition by columns should have an ordering".to_string(),
+ ));
+ }
+ Ok(Self {
schema,
input,
batches: vec![],
@@ -319,7 +332,8 @@ impl WindowAggStream {
window_expr,
baseline_metrics,
partition_by_sort_keys,
- }
+ ordered_partition_by_indices,
+ })
}
fn compute_aggregates(&self) -> Result<RecordBatch> {
@@ -331,9 +345,9 @@ impl WindowAggStream {
}
let partition_by_sort_keys = self
- .partition_by_sort_keys
+ .ordered_partition_by_indices
.iter()
- .map(|elem| elem.evaluate_to_sort_column(&batch))
+ .map(|idx|
self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
.collect::<Result<Vec<_>>>()?;
let partition_points =
evaluate_partition_ranges(batch.num_rows(),
&partition_by_sort_keys)?;
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index e2035fb227..f99071a6f1 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -38,7 +38,8 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions};
use bzip2::write::BzEncoder;
#[cfg(feature = "compression")]
use bzip2::Compression as BzCompression;
-use datafusion_common::DataFusionError;
+use datafusion_common::{DataFusionError, Statistics};
+use datafusion_physical_expr::PhysicalSortExpr;
#[cfg(feature = "compression")]
use flate2::write::GzEncoder;
#[cfg(feature = "compression")]
@@ -326,6 +327,32 @@ pub fn create_vec_batches(schema: &Schema, n: usize) ->
Vec<RecordBatch> {
vec
}
+/// Created a sorted Csv exec
+pub fn csv_exec_sorted(
+ schema: &SchemaRef,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+ infinite_source: bool,
+) -> Arc<dyn ExecutionPlan> {
+ let sort_exprs = sort_exprs.into_iter().collect();
+
+ Arc::new(CsvExec::new(
+ FileScanConfig {
+ object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+ file_schema: schema.clone(),
+ file_groups: vec![vec![PartitionedFile::new("x".to_string(),
100)]],
+ statistics: Statistics::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ output_ordering: Some(sort_exprs),
+ infinite_source,
+ },
+ false,
+ 0,
+ FileCompressionType::UNCOMPRESSED,
+ ))
+}
+
/// Create batch
fn create_batch(schema: &Schema) -> RecordBatch {
RecordBatch::try_new(
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 2213c95ec7..0170eed929 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2035,8 +2035,115 @@ SELECT
-1114 -1927628110
15673 -1899175111
+# test_window_agg_partition_by_set
+# These tests checks for whether BoundedWindowAggExec and WindowAggExec treats
partition by expressions as set.
+# Physical plan shouldn't have any SortExec in between Window Executors.
+
statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+ c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum2,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+ Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+ Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1, aggregate_test_100. [...]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5,
c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+ SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+ ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1,
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2,
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as sum2, SUM(aggregate_test_100.c9)
PARTITION BY [aggregate_test_100.c1, aggregate_test [...]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field {
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id:
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name:
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC
NULLS LAST,c8@7 ASC NULLS LAST]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query IIIII
+SELECT c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum2,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+ SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+ FROM aggregate_test_100
+ ORDER BY c9
+ LIMIT 5
+----
+28774375 9144476174 9144476174 12665844451 12665844451
+63044568 5125627947 5125627947 5125627947 5125627947
+141047417 3650978969 3650978969 3650978969 3650978969
+141680161 8526017165 8526017165 11924524414 11924524414
+145294611 6802765992 6802765992 6802765992 6802765992
+
+
+# test_window_agg_child_equivalence
+
+query TT
+EXPLAIN SELECT c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9 ASC ROWS BETWEEN 1
PRECEDING AND 5 FOLLOWING) as sum2,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+ SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+ FROM (SELECT c1, c2, c8, c9, c1 as c1_alias
+ FROM aggregate_test_100
+ ORDER BY c9) t1
+ LIMIT 5
+----
+logical_plan
+Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9)
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN
1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING AS sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAS [...]
+ Limit: skip=0, fetch=5
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+ WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2]
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING
AND UNBOUNDED FOLLOWING]]
+ SubqueryAlias: t1
+ Sort: aggregate_test_100.c9 ASC NULLS LAST
+ Projection: aggregate_test_100.c1, aggregate_test_100.c2,
aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+ TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
+physical_plan
+ProjectionExec: expr=[c9@3 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum1,
SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST]
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@8 as sum2, SUM(t1.c9) PARTITION BY
[t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as sum3, SUM(t1.c9) PARTITION BY
[t1.c2, t1.c1_alias] ORDER BY [...]
+ GlobalLimitExec: skip=0, fetch=5
+ BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+ WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)",
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+ SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC
NULLS LAST,c8@2 ASC NULLS LAST]
+ ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3
as c9, c1@0 as c1_alias]
+ CsvExec: files={1 group:
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true,
limit=None, projection=[c1, c2, c8, c9]
+
+
+query IIIII
+SELECT c9,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING
AND 5 FOLLOWING) as sum1,
+ SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9 ASC ROWS BETWEEN 1
PRECEDING AND 5 FOLLOWING) as sum2,
+ SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+ SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9, c8 ASC ROWS BETWEEN 1
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+ FROM (SELECT c1, c2, c8, c9, c1 as c1_alias
+ FROM aggregate_test_100
+ ORDER BY c9) t1
+ LIMIT 5
+----
+774637006 12189635055 12189635055 12189635055 12189635055
+1454057357 12189635055 12189635055 12189635055 12189635055
+2669374863 11414998049 11414998049 11414998049 11414998049
+3276123488 9960940692 9960940692 9960940692 9960940692
+4015442341 7291565829 7291565829 7291565829 7291565829
+
# test_window_agg_with_bounded_group
query TT
diff --git a/datafusion/physical-expr/src/utils.rs
b/datafusion/physical-expr/src/utils.rs
index 1dd7a2609a..e459246692 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -29,6 +29,7 @@ use datafusion_common::tree_node::{
};
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
+use std::borrow::Borrow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;
@@ -153,19 +154,14 @@ pub fn normalize_expr_with_equivalence_properties(
expr.clone()
.transform(&|expr| {
let normalized_form: Option<Arc<dyn PhysicalExpr>> =
- match expr.as_any().downcast_ref::<Column>() {
- Some(column) => {
- let mut normalized: Option<Arc<dyn PhysicalExpr>> =
None;
- for class in eq_properties {
- if class.contains(column) {
- normalized =
Some(Arc::new(class.head().clone()));
- break;
- }
+ expr.as_any().downcast_ref::<Column>().and_then(|column| {
+ for class in eq_properties {
+ if class.contains(column) {
+ return Some(Arc::new(class.head().clone()) as _);
}
- normalized
}
- None => None,
- };
+ None
+ });
Ok(if let Some(normalized_form) = normalized_form {
Transformed::Yes(normalized_form)
} else {
@@ -376,16 +372,61 @@ pub fn map_columns_before_projection(
parent_required
.iter()
.filter_map(|r| {
- if let Some(column) = r.as_any().downcast_ref::<Column>() {
- column_mapping.get(column.name())
- } else {
- None
- }
+ r.as_any()
+ .downcast_ref::<Column>()
+ .and_then(|c| column_mapping.get(c.name()))
})
.map(|e| Arc::new(e.clone()) as _)
.collect()
}
+/// This function returns all `Arc<dyn PhysicalExpr>`s inside the given
+/// `PhysicalSortExpr` sequence.
+pub fn convert_to_expr<T: Borrow<PhysicalSortExpr>>(
+ sequence: impl IntoIterator<Item = T>,
+) -> Vec<Arc<dyn PhysicalExpr>> {
+ sequence
+ .into_iter()
+ .map(|elem| elem.borrow().expr.clone())
+ .collect()
+}
+
+/// This function finds the indices of `targets` within `items`, taking into
+/// account equivalences according to `equal_properties`.
+pub fn get_indices_of_matching_exprs<
+ T: Borrow<Arc<dyn PhysicalExpr>>,
+ F: FnOnce() -> EquivalenceProperties,
+>(
+ targets: impl IntoIterator<Item = T>,
+ items: &[Arc<dyn PhysicalExpr>],
+ equal_properties: F,
+) -> Vec<usize> {
+ if let eq_classes @ [_, ..] = equal_properties().classes() {
+ let normalized_targets = targets.into_iter().map(|e| {
+ normalize_expr_with_equivalence_properties(e.borrow().clone(),
eq_classes)
+ });
+ let normalized_items = items
+ .iter()
+ .map(|e| normalize_expr_with_equivalence_properties(e.clone(),
eq_classes))
+ .collect::<Vec<_>>();
+ get_indices_of_exprs_strict(normalized_targets, &normalized_items)
+ } else {
+ get_indices_of_exprs_strict(targets, items)
+ }
+}
+
+/// This function finds the indices of `targets` within `items` using strict
+/// equality.
+fn get_indices_of_exprs_strict<T: Borrow<Arc<dyn PhysicalExpr>>>(
+ targets: impl IntoIterator<Item = T>,
+ items: &[Arc<dyn PhysicalExpr>],
+) -> Vec<usize> {
+ targets
+ .into_iter()
+ .filter_map(|target| items.iter().position(|e| e.eq(target.borrow())))
+ .collect()
+}
+
#[derive(Clone, Debug)]
pub struct ExprTreeNode<T> {
expr: Arc<dyn PhysicalExpr>,
@@ -674,6 +715,42 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_convert_to_expr() -> Result<()> {
+ let schema = Schema::new(vec![Field::new("a", DataType::UInt64,
false)]);
+ let sort_expr = vec![PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: Default::default(),
+ }];
+ assert!(convert_to_expr(&sort_expr)[0].eq(&sort_expr[0].expr));
+ Ok(())
+ }
+
+ #[test]
+ fn test_get_indices_of_matching_exprs() {
+ let empty_schema = &Arc::new(Schema::empty());
+ let equal_properties = ||
EquivalenceProperties::new(empty_schema.clone());
+ let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(Column::new("a", 0)),
+ Arc::new(Column::new("b", 1)),
+ Arc::new(Column::new("c", 2)),
+ Arc::new(Column::new("d", 3)),
+ ];
+ let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
+ Arc::new(Column::new("b", 1)),
+ Arc::new(Column::new("c", 2)),
+ Arc::new(Column::new("a", 0)),
+ ];
+ assert_eq!(
+ get_indices_of_matching_exprs(&list1, &list2, equal_properties),
+ vec![2, 0, 1]
+ );
+ assert_eq!(
+ get_indices_of_matching_exprs(&list2, &list1, equal_properties),
+ vec![1, 2, 0]
+ );
+ }
+
#[test]
fn expr_list_eq_test() -> Result<()> {
let list1: Vec<Arc<dyn PhysicalExpr>> = vec![