This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 26a36024c2 Move window analysis to the window method (#7672)
26a36024c2 is described below
commit 26a36024c27c5dacdf7ec2d530a5de7673a41f5f
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Sep 28 20:24:48 2023 +0300
Move window analysis to the window method (#7672)
* Move window analysis to the method
* Preserve physical partition keys, during recreation of the window
* Final review
* Avoid cloning if not necessary
---------
Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
datafusion/common/src/utils.rs | 104 ++++
.../src/physical_optimizer/enforce_distribution.rs | 7 +-
.../core/src/physical_optimizer/enforce_sorting.rs | 661 ++------------------
.../replace_with_order_preserving_variants.rs | 9 +-
datafusion/core/src/physical_optimizer/utils.rs | 136 -----
datafusion/physical-plan/src/lib.rs | 11 +
.../src/windows/bounded_window_agg_exec.rs | 35 +-
datafusion/physical-plan/src/windows/mod.rs | 680 ++++++++++++++++++++-
.../physical-plan/src/windows/window_agg_exec.rs | 14 +-
9 files changed, 873 insertions(+), 784 deletions(-)
diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 6141723b06..b7c80aa9ac 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -28,6 +28,7 @@ use sqlparser::dialect::GenericDialect;
use sqlparser::parser::Parser;
use std::borrow::{Borrow, Cow};
use std::cmp::Ordering;
+use std::collections::HashSet;
use std::ops::Range;
use std::sync::Arc;
@@ -429,6 +430,64 @@ pub mod datafusion_strsim {
}
}
+/// Merges collections `first` and `second`, removes duplicates and sorts the
+/// result, returning it as a [`Vec`].
+pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
+ first: impl IntoIterator<Item = T>,
+ second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+ let mut result: Vec<_> = first
+ .into_iter()
+ .map(|e| *e.borrow())
+ .chain(second.into_iter().map(|e| *e.borrow()))
+ .collect::<HashSet<_>>()
+ .into_iter()
+ .collect();
+ result.sort();
+ result
+}
+
+/// Calculates the set difference between sequences `first` and `second`,
+/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
+pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
+ first: impl IntoIterator<Item = T>,
+ second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+ let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
+ first
+ .into_iter()
+ .map(|e| *e.borrow())
+ .filter(|e| !set.contains(e))
+ .collect()
+}
+
+/// Checks whether the given index sequence is monotonically non-decreasing.
+pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) ->
bool {
+ // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
+ let mut previous = 0;
+ for item in sequence.into_iter() {
+ let current = *item.borrow();
+ if current < previous {
+ return false;
+ }
+ previous = current;
+ }
+ true
+}
+
+/// Find indices of each element in `targets` inside `items`. If one of the
+/// elements is absent in `items`, returns an error.
+pub fn find_indices<T: PartialEq, S: Borrow<T>>(
+ items: &[T],
+ targets: impl IntoIterator<Item = S>,
+) -> Result<Vec<usize>> {
+ targets
+ .into_iter()
+ .map(|target| items.iter().position(|e| target.borrow().eq(e)))
+ .collect::<Option<_>>()
+ .ok_or_else(|| DataFusionError::Execution("Target not
found".to_string()))
+}
+
#[cfg(test)]
mod tests {
use crate::ScalarValue;
@@ -747,4 +806,49 @@ mod tests {
"cloned `Arc` should point to same data as the original"
);
}
+
+ #[test]
+ fn test_merge_and_order_indices() {
+ assert_eq!(
+ merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+ vec![0, 1, 3, 4, 5]
+ );
+ // Result should be ordered, even if inputs are not
+ assert_eq!(
+ merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+ vec![0, 1, 3, 4, 5]
+ );
+ }
+
+ #[test]
+ fn test_set_difference() {
+ assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
+ assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
+ // return value should have same ordering with the in1
+ assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
+ assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
+ assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
+ }
+
+ #[test]
+ fn test_is_sorted() {
+ assert!(is_sorted::<usize>([]));
+ assert!(is_sorted([0]));
+ assert!(is_sorted([0, 3, 4]));
+ assert!(is_sorted([0, 1, 2]));
+ assert!(is_sorted([0, 1, 4]));
+ assert!(is_sorted([0usize; 0]));
+ assert!(is_sorted([1, 2]));
+ assert!(!is_sorted([3, 2]));
+ }
+
+ #[test]
+ fn test_find_indices() -> Result<()> {
+ assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+ assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+ assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+ assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+ assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+ Ok(())
+ }
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index fea7f8c5d8..b2a1a03383 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -28,9 +28,7 @@ use std::sync::Arc;
use crate::config::ConfigOptions;
use crate::datasource::physical_plan::{CsvExec, ParquetExec};
use crate::error::{DataFusionError, Result};
-use crate::physical_optimizer::utils::{
- add_sort_above, get_plan_string, unbounded_output, ExecTree,
-};
+use crate::physical_optimizer::utils::{add_sort_above, get_plan_string,
ExecTree};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode,
PhysicalGroupBy};
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -46,6 +44,7 @@ use crate::physical_plan::windows::WindowAggExec;
use crate::physical_plan::Partitioning;
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
+use datafusion_common::internal_err;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_expr::logical_plan::JoinType;
use datafusion_physical_expr::equivalence::EquivalenceProperties;
@@ -56,8 +55,8 @@ use datafusion_physical_expr::utils::{
use datafusion_physical_expr::{
expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
};
+use datafusion_physical_plan::unbounded_output;
-use datafusion_common::internal_err;
use itertools::izip;
/// The `EnforceDistribution` rule ensures that distribution requirements are
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index b6f2adac1b..a149330181 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -43,30 +43,26 @@ use
crate::physical_optimizer::replace_with_order_preserving_variants::{
};
use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
use crate::physical_optimizer::utils::{
- add_sort_above, find_indices, is_coalesce_partitions, is_limit,
is_repartition,
- is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
- merge_and_order_indices, set_difference, unbounded_output, ExecTree,
+ add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
+ is_sort_preserving_merge, is_union, is_window, ExecTree,
};
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
use crate::physical_plan::sorts::sort::SortExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::windows::{
- BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
+ get_best_fitting_window, BoundedWindowAggExec, PartitionSearchMode,
WindowAggExec,
};
use crate::physical_plan::{with_new_children_if_necessary, Distribution,
ExecutionPlan};
-use arrow::datatypes::SchemaRef;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::utils::{get_at_indices, longest_consecutive_prefix};
use datafusion_common::{plan_err, DataFusionError};
use datafusion_physical_expr::utils::{
- convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy,
- ordering_satisfy_requirement_concrete,
+ ordering_satisfy, ordering_satisfy_requirement_concrete,
};
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr,
PhysicalSortRequirement};
+use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
-use itertools::{izip, Itertools};
+use itertools::izip;
/// This rule inspects [`SortExec`]'s in the given physical plan and removes
the
/// ones it can prove unnecessary.
@@ -564,106 +560,73 @@ fn analyze_window_sort_removal(
sort_tree: &mut ExecTree,
window_exec: &Arc<dyn ExecutionPlan>,
) -> Result<Option<PlanWithCorrespondingSort>> {
- let (window_expr, partition_keys) =
+ let requires_single_partition = matches!(
+ window_exec.required_input_distribution()[sort_tree.idx],
+ Distribution::SinglePartition
+ );
+ let mut window_child =
+ remove_corresponding_sort_from_sub_plan(sort_tree,
requires_single_partition)?;
+
+ let (window_expr, new_window) =
if let Some(exec) =
window_exec.as_any().downcast_ref::<BoundedWindowAggExec>() {
- (exec.window_expr(), &exec.partition_keys)
+ (
+ exec.window_expr(),
+ get_best_fitting_window(
+ exec.window_expr(),
+ &window_child,
+ &exec.partition_keys,
+ )?,
+ )
} else if let Some(exec) =
window_exec.as_any().downcast_ref::<WindowAggExec>() {
- (exec.window_expr(), &exec.partition_keys)
+ (
+ exec.window_expr(),
+ get_best_fitting_window(
+ exec.window_expr(),
+ &window_child,
+ &exec.partition_keys,
+ )?,
+ )
} else {
return plan_err!(
"Expects to receive either WindowAggExec of
BoundedWindowAggExec"
);
};
let partitionby_exprs = window_expr[0].partition_by();
- let orderby_sort_keys = window_expr[0].order_by();
-
- // search_flags stores return value of the can_skip_sort.
- // `None` case represents `SortExec` cannot be removed.
- // `PartitionSearch` mode stores at which mode executor should work to
remove
- // `SortExec` before it,
- // `bool` stores whether or not we need to reverse window expressions to
remove `SortExec`.
- let mut search_flags = None;
- for sort_any in sort_tree.get_leaves() {
- // Variable `sort_any` will either be a `SortExec` or a
- // `SortPreservingMergeExec`, and both have a single child.
- // Therefore, we can use the 0th index without loss of generality.
- let sort_input = &sort_any.children()[0];
- let flags = can_skip_sort(partitionby_exprs, orderby_sort_keys,
sort_input)?;
- if flags.is_some() && (search_flags.is_none() || search_flags ==
flags) {
- search_flags = flags;
- continue;
- }
- // We can not skip the sort, or window reversal requirements are not
- // uniform; then sort removal is not possible -- we immediately return.
- return Ok(None);
- }
- let (should_reverse, partition_search_mode) = if let Some(search_flags) =
search_flags
- {
- search_flags
- } else {
- // We can not skip the sort return:
- return Ok(None);
- };
- let is_unbounded = unbounded_output(window_exec);
- if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted {
- // Executor has bounded input and `partition_search_mode` is not
`PartitionSearchMode::Sorted`
- // in this case removing the sort is not helpful, return:
- return Ok(None);
- };
- let new_window_expr = if should_reverse {
- window_expr
- .iter()
- .map(|e| e.get_reverse_expr())
- .collect::<Option<Vec<_>>>()
+ if let Some(new_window) = new_window {
+ // We were able to change the window to accommodate the input, use it:
+ Ok(Some(PlanWithCorrespondingSort::new(new_window)))
} else {
- Some(window_expr.to_vec())
- };
- if let Some(window_expr) = new_window_expr {
- let requires_single_partition = matches!(
- window_exec.required_input_distribution()[sort_tree.idx],
- Distribution::SinglePartition
- );
- let mut new_child = remove_corresponding_sort_from_sub_plan(
- sort_tree,
- requires_single_partition,
- )?;
- let new_schema = new_child.schema();
+ // We were unable to change the window to accommodate the input, so we
+ // will insert a sort.
+ let reqs = window_exec
+ .required_input_ordering()
+ .swap_remove(0)
+ .unwrap_or(vec![]);
+ let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
+ // Satisfy the ordering requirement so that the window can run:
+ add_sort_above(&mut window_child, sort_expr, None)?;
let uses_bounded_memory = window_expr.iter().all(|e|
e.uses_bounded_memory());
- // If all window expressions can run with bounded memory, choose the
- // bounded window variant:
- let new_plan = if uses_bounded_memory {
+ let input_schema = window_child.schema();
+ let new_window = if uses_bounded_memory {
Arc::new(BoundedWindowAggExec::try_new(
- window_expr,
- new_child,
- new_schema,
- partition_keys.to_vec(),
- partition_search_mode,
+ window_expr.to_vec(),
+ window_child,
+ input_schema,
+ partitionby_exprs.to_vec(),
+ PartitionSearchMode::Sorted,
)?) as _
} else {
- if partition_search_mode != PartitionSearchMode::Sorted {
- // For `WindowAggExec` to work correctly PARTITION BY columns
should be sorted.
- // Hence, if `partition_search_mode` is not
`PartitionSearchMode::Sorted` we should convert
- // input ordering such that it can work with
PartitionSearchMode::Sorted (add `SortExec`).
- // Effectively `WindowAggExec` works only in
PartitionSearchMode::Sorted mode.
- let reqs = window_exec
- .required_input_ordering()
- .swap_remove(0)
- .unwrap_or(vec![]);
- let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
- add_sort_above(&mut new_child, sort_expr, None)?;
- };
Arc::new(WindowAggExec::try_new(
- window_expr,
- new_child,
- new_schema,
- partition_keys.to_vec(),
+ window_expr.to_vec(),
+ window_child,
+ input_schema,
+ partitionby_exprs.to_vec(),
)?) as _
};
- return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+ Ok(Some(PlanWithCorrespondingSort::new(new_window)))
}
- Ok(None)
}
/// Updates child to remove the unnecessary [`CoalescePartitionsExec`] below
it.
@@ -784,155 +747,12 @@ fn get_sort_exprs(
}
}
-/// Compares physical ordering (output ordering of input executor) with
-/// `partitionby_exprs` and `orderby_keys`
-/// to decide whether existing ordering is sufficient to run current window
executor.
-/// A `None` return value indicates that we can not remove the sort in
question (input ordering is not
-/// sufficient to run current window executor).
-/// A `Some((bool, PartitionSearchMode))` value indicates window executor can
be run with existing input ordering
-/// (Hence we can remove [`SortExec`] before it).
-/// `bool` represents whether we should reverse window executor to remove
[`SortExec`] before it.
-/// `PartitionSearchMode` represents, in which mode Window Executor should
work with existing ordering.
-fn can_skip_sort(
- partitionby_exprs: &[Arc<dyn PhysicalExpr>],
- orderby_keys: &[PhysicalSortExpr],
- input: &Arc<dyn ExecutionPlan>,
-) -> Result<Option<(bool, PartitionSearchMode)>> {
- let physical_ordering = if let Some(physical_ordering) =
input.output_ordering() {
- physical_ordering
- } else {
- // If there is no physical ordering, there is no way to remove a
- // sort, so immediately return.
- return Ok(None);
- };
- let orderby_exprs = convert_to_expr(orderby_keys);
- let physical_ordering_exprs = convert_to_expr(physical_ordering);
- let equal_properties = || input.equivalence_properties();
- // indices of the order by expressions among input ordering expressions
- let ob_indices = get_indices_of_matching_exprs(
- &orderby_exprs,
- &physical_ordering_exprs,
- equal_properties,
- );
- if ob_indices.len() != orderby_exprs.len() {
- // If all order by expressions are not in the input ordering,
- // there is no way to remove a sort -- immediately return:
- return Ok(None);
- }
- // indices of the partition by expressions among input ordering expressions
- let pb_indices = get_indices_of_matching_exprs(
- partitionby_exprs,
- &physical_ordering_exprs,
- equal_properties,
- );
- let ordered_merged_indices = merge_and_order_indices(&pb_indices,
&ob_indices);
- // Indices of order by columns that doesn't seen in partition by
- // Equivalently (Order by columns) ∖ (Partition by columns) where `∖`
represents set difference.
- let unique_ob_indices = set_difference(&ob_indices, &pb_indices);
- if !is_sorted(&unique_ob_indices) {
- // ORDER BY indices should be ascending ordered
- return Ok(None);
- }
- let first_n = longest_consecutive_prefix(ordered_merged_indices);
- let furthest_ob_index = *unique_ob_indices.last().unwrap_or(&0);
- // Cannot skip sort if last order by index is not within consecutive
prefix.
- // For instance, if input is ordered by a,b,c,d
- // for expression `PARTITION BY a, ORDER BY b, d`, `first_n` would be 2
(meaning a, b defines a prefix for input ordering)
- // Whereas `furthest_ob_index` would be 3 (column d occurs at the 3rd
index of the existing ordering.)
- // Hence existing ordering is not sufficient to run current Executor.
- // However, for expression `PARTITION BY a, ORDER BY b, c, d`, `first_n`
would be 4 (meaning a, b, c, d defines a prefix for input ordering)
- // Similarly, `furthest_ob_index` would be 3 (column d occurs at the 3rd
index of the existing ordering.)
- // Hence existing ordering would be sufficient to run current Executor.
- if first_n <= furthest_ob_index {
- return Ok(None);
- }
- let input_orderby_columns = get_at_indices(physical_ordering,
&unique_ob_indices)?;
- let expected_orderby_columns =
- get_at_indices(orderby_keys, find_indices(&ob_indices,
&unique_ob_indices)?)?;
- let should_reverse = if let Some(should_reverse) = check_alignments(
- &input.schema(),
- &input_orderby_columns,
- &expected_orderby_columns,
- )? {
- should_reverse
- } else {
- // If ordering directions are not aligned. We cannot calculate result
without changing existing ordering.
- return Ok(None);
- };
-
- let ordered_pb_indices =
pb_indices.iter().copied().sorted().collect::<Vec<_>>();
- // Determine how many elements in the partition by columns defines a
consecutive range from zero.
- let first_n = longest_consecutive_prefix(&ordered_pb_indices);
- let mode = if first_n == partitionby_exprs.len() {
- // All of the partition by columns defines a consecutive range from
zero.
- PartitionSearchMode::Sorted
- } else if first_n > 0 {
- // All of the partition by columns defines a consecutive range from
zero.
- let ordered_range = &ordered_pb_indices[0..first_n];
- let input_pb_exprs = get_at_indices(&physical_ordering_exprs,
ordered_range)?;
- let partially_ordered_indices = get_indices_of_matching_exprs(
- &input_pb_exprs,
- partitionby_exprs,
- equal_properties,
- );
- PartitionSearchMode::PartiallySorted(partially_ordered_indices)
- } else {
- // None of the partition by columns defines a consecutive range from
zero.
- PartitionSearchMode::Linear
- };
-
- Ok(Some((should_reverse, mode)))
-}
-
-fn check_alignments(
- schema: &SchemaRef,
- physical_ordering: &[PhysicalSortExpr],
- required: &[PhysicalSortExpr],
-) -> Result<Option<bool>> {
- let res = izip!(physical_ordering, required)
- .map(|(lhs, rhs)| check_alignment(schema, lhs, rhs))
- .collect::<Result<Option<Vec<_>>>>()?;
- Ok(if let Some(res) = res {
- if !res.is_empty() {
- let first = res[0];
- let all_same = res.into_iter().all(|elem| elem == first);
- all_same.then_some(first)
- } else {
- Some(false)
- }
- } else {
- // Cannot skip some of the requirements in the input.
- None
- })
-}
-
-/// Compares `physical_ordering` and `required` ordering, decides whether
-/// alignments match. A `None` return value indicates that current column is
-/// not aligned. A `Some(bool)` value indicates otherwise, and signals whether
-/// we should reverse the window expression in order to avoid sorting.
-fn check_alignment(
- input_schema: &SchemaRef,
- physical_ordering: &PhysicalSortExpr,
- required: &PhysicalSortExpr,
-) -> Result<Option<bool>> {
- Ok(if required.expr.eq(&physical_ordering.expr) {
- let physical_opts = physical_ordering.options;
- let required_opts = required.options;
- if required.expr.nullable(input_schema)? {
- let reverse = physical_opts == !required_opts;
- (reverse || physical_opts == required_opts).then_some(reverse)
- } else {
- // If the column is not nullable, NULLS FIRST/LAST is not
important.
- Some(physical_opts.descending != required_opts.descending)
- }
- } else {
- None
- })
-}
-
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use super::*;
+ use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
use crate::physical_optimizer::test_utils::{
aggregate_exec, bounded_window_exec, coalesce_batches_exec,
coalesce_partitions_exec, filter_exec, global_limit_exec,
hash_join_exec,
@@ -940,10 +760,8 @@ mod tests {
repartition_exec, sort_exec, sort_expr, sort_expr_options,
sort_merge_join_exec,
sort_preserving_merge_exec, union_exec,
};
+ use crate::physical_optimizer::utils::get_plan_string;
use crate::physical_plan::repartition::RepartitionExec;
- use crate::physical_plan::windows::PartitionSearchMode::{
- Linear, PartiallySorted, Sorted,
- };
use crate::physical_plan::{displayable, Partitioning};
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::csv_exec_sorted;
@@ -954,11 +772,6 @@ mod tests {
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::expressions::{col, NotExpr};
- use datafusion_physical_expr::PhysicalSortExpr;
-
- use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
- use crate::physical_optimizer::utils::get_plan_string;
- use std::sync::Arc;
fn create_test_schema() -> Result<SchemaRef> {
let nullable_column = Field::new("nullable_col", DataType::Int32,
true);
@@ -986,364 +799,6 @@ mod tests {
Ok(schema)
}
- #[tokio::test]
- async fn test_is_column_aligned_nullable() -> Result<()> {
- let schema = create_test_schema()?;
- let params = vec![
- ((true, true), (false, false), Some(true)),
- ((true, true), (false, true), None),
- ((true, true), (true, false), None),
- ((true, false), (false, true), Some(true)),
- ((true, false), (false, false), None),
- ((true, false), (true, true), None),
- ];
- for (
- (physical_desc, physical_nulls_first),
- (req_desc, req_nulls_first),
- expected,
- ) in params
- {
- let physical_ordering = PhysicalSortExpr {
- expr: col("nullable_col", &schema)?,
- options: SortOptions {
- descending: physical_desc,
- nulls_first: physical_nulls_first,
- },
- };
- let required_ordering = PhysicalSortExpr {
- expr: col("nullable_col", &schema)?,
- options: SortOptions {
- descending: req_desc,
- nulls_first: req_nulls_first,
- },
- };
- let res = check_alignment(&schema, &physical_ordering,
&required_ordering)?;
- assert_eq!(res, expected);
- }
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_is_column_aligned_non_nullable() -> Result<()> {
- let schema = create_test_schema()?;
-
- let params = vec![
- ((true, true), (false, false), Some(true)),
- ((true, true), (false, true), Some(true)),
- ((true, true), (true, false), Some(false)),
- ((true, false), (false, true), Some(true)),
- ((true, false), (false, false), Some(true)),
- ((true, false), (true, true), Some(false)),
- ];
- for (
- (physical_desc, physical_nulls_first),
- (req_desc, req_nulls_first),
- expected,
- ) in params
- {
- let physical_ordering = PhysicalSortExpr {
- expr: col("non_nullable_col", &schema)?,
- options: SortOptions {
- descending: physical_desc,
- nulls_first: physical_nulls_first,
- },
- };
- let required_ordering = PhysicalSortExpr {
- expr: col("non_nullable_col", &schema)?,
- options: SortOptions {
- descending: req_desc,
- nulls_first: req_nulls_first,
- },
- };
- let res = check_alignment(&schema, &physical_ordering,
&required_ordering)?;
- assert_eq!(res, expected);
- }
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_can_skip_ordering_exhaustive() -> Result<()> {
- let test_schema = create_test_schema3()?;
- // Columns a,c are nullable whereas b,d are not nullable.
- // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC
NULLS FIRST, d ASC NULLS FIRST
- // Column e is not ordered.
- let sort_exprs = vec![
- sort_expr("a", &test_schema),
- sort_expr("b", &test_schema),
- sort_expr("c", &test_schema),
- sort_expr("d", &test_schema),
- ];
- let exec_unbounded = csv_exec_sorted(&test_schema, sort_exprs, true);
-
- // test cases consists of vector of tuples. Where each tuple
represents a single test case.
- // First field in the tuple is Vec<str> where each element in the
vector represents PARTITION BY columns
- // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
- // Second field in the tuple is Vec<str> where each element in the
vector represents ORDER BY columns
- // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST,
(ordering is default ordering. We do not check
- // for reversibility in this test).
- // Third field in the tuple is Option<PartitionSearchMode>, which
corresponds to expected algorithm mode.
- // None represents that existing ordering is not sufficient to run
executor with any one of the algorithms
- // (We need to add SortExec to be able to run it).
- // Some(PartitionSearchMode) represents, we can run algorithm with
existing ordering; and algorithm should work in
- // PartitionSearchMode.
- let test_cases = vec![
- (vec!["a"], vec!["a"], Some(Sorted)),
- (vec!["a"], vec!["b"], Some(Sorted)),
- (vec!["a"], vec!["c"], None),
- (vec!["a"], vec!["a", "b"], Some(Sorted)),
- (vec!["a"], vec!["b", "c"], Some(Sorted)),
- (vec!["a"], vec!["a", "c"], None),
- (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
- (vec!["b"], vec!["a"], Some(Linear)),
- (vec!["b"], vec!["b"], None),
- (vec!["b"], vec!["c"], None),
- (vec!["b"], vec!["a", "b"], Some(Linear)),
- (vec!["b"], vec!["b", "c"], None),
- (vec!["b"], vec!["a", "c"], Some(Linear)),
- (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
- (vec!["c"], vec!["a"], Some(Linear)),
- (vec!["c"], vec!["b"], None),
- (vec!["c"], vec!["c"], None),
- (vec!["c"], vec!["a", "b"], Some(Linear)),
- (vec!["c"], vec!["b", "c"], None),
- (vec!["c"], vec!["a", "c"], Some(Linear)),
- (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
- (vec!["b", "a"], vec!["a"], Some(Sorted)),
- (vec!["b", "a"], vec!["b"], Some(Sorted)),
- (vec!["b", "a"], vec!["c"], Some(Sorted)),
- (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
- (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
- (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
- (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
- (vec!["c", "b"], vec!["a"], Some(Linear)),
- (vec!["c", "b"], vec!["b"], None),
- (vec!["c", "b"], vec!["c"], None),
- (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
- (vec!["c", "b"], vec!["b", "c"], None),
- (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
- (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
- (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
- (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
- (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
- (
- vec!["c", "a"],
- vec!["a", "b"],
- Some(PartiallySorted(vec![1])),
- ),
- (
- vec!["c", "a"],
- vec!["b", "c"],
- Some(PartiallySorted(vec![1])),
- ),
- (
- vec!["c", "a"],
- vec!["a", "c"],
- Some(PartiallySorted(vec![1])),
- ),
- (
- vec!["c", "a"],
- vec!["a", "b", "c"],
- Some(PartiallySorted(vec![1])),
- ),
- (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
- (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
- (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
- (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
- (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
- (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
- (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
- ];
- for (case_idx, test_case) in test_cases.iter().enumerate() {
- let (partition_by_columns, order_by_params, expected) = &test_case;
- let mut partition_by_exprs = vec![];
- for col_name in partition_by_columns {
- partition_by_exprs.push(col(col_name, &test_schema)?);
- }
-
- let mut order_by_exprs = vec![];
- for col_name in order_by_params {
- let expr = col(col_name, &test_schema)?;
- // Give default ordering, this is same with input ordering
direction
- // In this test we do check for reversibility.
- let options = SortOptions::default();
- order_by_exprs.push(PhysicalSortExpr { expr, options });
- }
- let res =
- can_skip_sort(&partition_by_exprs, &order_by_exprs,
&exec_unbounded)?;
- // Since reversibility is not important in this test. Convert
Option<(bool, PartitionSearchMode)> to Option<PartitionSearchMode>
- let res = res.map(|(_, mode)| mode);
- assert_eq!(
- res, *expected,
- "Unexpected result for in unbounded test case#: {case_idx:?},
case: {test_case:?}"
- );
- }
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_can_skip_ordering() -> Result<()> {
- let test_schema = create_test_schema3()?;
- // Columns a,c are nullable whereas b,d are not nullable.
- // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC
NULLS FIRST, d ASC NULLS FIRST
- // Column e is not ordered.
- let sort_exprs = vec![
- sort_expr("a", &test_schema),
- sort_expr("b", &test_schema),
- sort_expr("c", &test_schema),
- sort_expr("d", &test_schema),
- ];
- let exec_unbounded = csv_exec_sorted(&test_schema, sort_exprs, true);
-
- // test cases consists of vector of tuples. Where each tuple
represents a single test case.
- // First field in the tuple is Vec<str> where each element in the
vector represents PARTITION BY columns
- // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
- // Second field in the tuple is Vec<(str, bool, bool)> where each
element in the vector represents ORDER BY columns
- // For instance, vec![("c", false, false)], corresponds to ORDER BY c
ASC NULLS LAST,
- // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC
NULLS FIRST,
- // Third field in the tuple is Option<(bool, PartitionSearchMode)>,
which corresponds to expected result.
- // None represents that existing ordering is not sufficient to run
executor with any one of the algorithms
- // (We need to add SortExec to be able to run it).
- // Some((bool, PartitionSearchMode)) represents, we can run algorithm
with existing ordering. Algorithm should work in
- // PartitionSearchMode, bool field represents whether we should
reverse window expressions to run executor with existing ordering.
- // For instance, `Some((false, PartitionSearchMode::Sorted))`,
represents that we shouldn't reverse window expressions. And algorithm
- // should work in Sorted mode to work with existing ordering.
- let test_cases = vec![
- // PARTITION BY a, b ORDER BY c ASC NULLS LAST
- (vec!["a", "b"], vec![("c", false, false)], None),
- // ORDER BY c ASC NULLS FIRST
- (vec![], vec![("c", false, true)], None),
- // PARTITION BY b, ORDER BY c ASC NULLS FIRST
- (vec!["b"], vec![("c", false, true)], None),
- // PARTITION BY a, ORDER BY c ASC NULLS FIRST
- (vec!["a"], vec![("c", false, true)], None),
- // PARTITION BY b, ORDER BY c ASC NULLS FIRST
- (
- vec!["a", "b"],
- vec![("c", false, true), ("e", false, true)],
- None,
- ),
- // PARTITION BY a, ORDER BY b ASC NULLS FIRST
- (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
- // PARTITION BY a, ORDER BY a ASC NULLS FIRST
- (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
- // PARTITION BY a, ORDER BY a ASC NULLS LAST
- (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
- // PARTITION BY a, ORDER BY a DESC NULLS FIRST
- (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
- // PARTITION BY a, ORDER BY a DESC NULLS LAST
- (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
- // PARTITION BY a, ORDER BY b ASC NULLS LAST
- (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
- // PARTITION BY a, ORDER BY b DESC NULLS LAST
- (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
- // PARTITION BY a, b ORDER BY c ASC NULLS FIRST
- (
- vec!["a", "b"],
- vec![("c", false, true)],
- Some((false, Sorted)),
- ),
- // PARTITION BY b, a ORDER BY c ASC NULLS FIRST
- (
- vec!["b", "a"],
- vec![("c", false, true)],
- Some((false, Sorted)),
- ),
- // PARTITION BY a, b ORDER BY c DESC NULLS LAST
- (
- vec!["a", "b"],
- vec![("c", true, false)],
- Some((true, Sorted)),
- ),
- // PARTITION BY e ORDER BY a ASC NULLS FIRST
- (
- vec!["e"],
- vec![("a", false, true)],
- // For unbounded, expects to work in Linear mode. Shouldn't
reverse window function.
- Some((false, Linear)),
- ),
- // PARTITION BY b, c ORDER BY a ASC NULLS FIRST, c ASC NULLS FIRST
- (
- vec!["b", "c"],
- vec![("a", false, true), ("c", false, true)],
- Some((false, Linear)),
- ),
- // PARTITION BY b ORDER BY a ASC NULLS FIRST
- (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
- // PARTITION BY a, e ORDER BY b ASC NULLS FIRST
- (
- vec!["a", "e"],
- vec![("b", false, true)],
- Some((false, PartiallySorted(vec![0]))),
- ),
- // PARTITION BY a, c ORDER BY b ASC NULLS FIRST
- (
- vec!["a", "c"],
- vec![("b", false, true)],
- Some((false, PartiallySorted(vec![0]))),
- ),
- // PARTITION BY c, a ORDER BY b ASC NULLS FIRST
- (
- vec!["c", "a"],
- vec![("b", false, true)],
- Some((false, PartiallySorted(vec![1]))),
- ),
- // PARTITION BY d, b, a ORDER BY c ASC NULLS FIRST
- (
- vec!["d", "b", "a"],
- vec![("c", false, true)],
- Some((false, PartiallySorted(vec![2, 1]))),
- ),
- // PARTITION BY e, b, a ORDER BY c ASC NULLS FIRST
- (
- vec!["e", "b", "a"],
- vec![("c", false, true)],
- Some((false, PartiallySorted(vec![2, 1]))),
- ),
- // PARTITION BY d, a ORDER BY b ASC NULLS FIRST
- (
- vec!["d", "a"],
- vec![("b", false, true)],
- Some((false, PartiallySorted(vec![1]))),
- ),
- // PARTITION BY b, ORDER BY b, a ASC NULLS FIRST
- (
- vec!["a"],
- vec![("b", false, true), ("a", false, true)],
- Some((false, Sorted)),
- ),
- // ORDER BY b, a ASC NULLS FIRST
- (vec![], vec![("b", false, true), ("a", false, true)], None),
- ];
- for (case_idx, test_case) in test_cases.iter().enumerate() {
- let (partition_by_columns, order_by_params, expected) = &test_case;
- let mut partition_by_exprs = vec![];
- for col_name in partition_by_columns {
- partition_by_exprs.push(col(col_name, &test_schema)?);
- }
-
- let mut order_by_exprs = vec![];
- for (col_name, descending, nulls_first) in order_by_params {
- let expr = col(col_name, &test_schema)?;
- let options = SortOptions {
- descending: *descending,
- nulls_first: *nulls_first,
- };
- order_by_exprs.push(PhysicalSortExpr { expr, options });
- }
-
- assert_eq!(
- can_skip_sort(&partition_by_exprs, &order_by_exprs,
&exec_unbounded)?,
- *expected,
- "Unexpected result for in unbounded test case#: {case_idx:?},
case: {test_case:?}"
- );
- }
-
- Ok(())
- }
-
/// Runs the sort enforcement optimizer and asserts the plan
/// against the original and expected plans
///
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index e223e43cb3..b0ae199a2d 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -19,10 +19,10 @@
//! order-preserving variants when it is helpful; either in terms of
//! performance or to accommodate unbounded streams by fixing the pipeline.
+use std::sync::Arc;
+
use crate::error::Result;
-use crate::physical_optimizer::utils::{
- is_coalesce_partitions, is_sort, unbounded_output, ExecTree,
-};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort,
ExecTree};
use crate::physical_plan::repartition::RepartitionExec;
use
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
@@ -32,8 +32,7 @@ use super::utils::is_repartition;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
use datafusion_physical_expr::utils::ordering_satisfy;
-
-use std::sync::Arc;
+use datafusion_physical_plan::unbounded_output;
/// For a given `plan`, this object carries the information one needs from its
/// descendants to decide whether it is beneficial to replace order-losing (but
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index b4dd75e586..21c976e07a 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -17,9 +17,6 @@
//! Collection of utility functions that are leveraged by the query optimizer
rules
-use itertools::concat;
-use std::borrow::Borrow;
-use std::collections::HashSet;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
@@ -34,7 +31,6 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{displayable, ExecutionPlan};
-use datafusion_common::DataFusionError;
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
@@ -75,30 +71,6 @@ impl ExecTree {
children,
}
}
-
- /// This function returns the executors at the leaves of the tree.
- pub fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- if self.children.is_empty() {
- vec![self.plan.clone()]
- } else {
- concat(self.children.iter().map(|e| e.get_leaves()))
- }
- }
-}
-
-// Get output (un)boundedness information for the given `plan`.
-pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
- let result = if plan.children().is_empty() {
- plan.unbounded_output(&[])
- } else {
- let children_unbounded_output = plan
- .children()
- .iter()
- .map(unbounded_output)
- .collect::<Vec<_>>();
- plan.unbounded_output(&children_unbounded_output)
- };
- result.unwrap_or(true)
}
/// This utility function adds a `SortExec` above an operator according to the
@@ -126,64 +98,6 @@ pub fn add_sort_above(
Ok(())
}
-/// Find indices of each element in `targets` inside `items`. If one of the
-/// elements is absent in `items`, returns an error.
-pub fn find_indices<T: PartialEq, S: Borrow<T>>(
- items: &[T],
- targets: impl IntoIterator<Item = S>,
-) -> Result<Vec<usize>> {
- targets
- .into_iter()
- .map(|target| items.iter().position(|e| target.borrow().eq(e)))
- .collect::<Option<_>>()
- .ok_or_else(|| DataFusionError::Execution("Target not
found".to_string()))
-}
-
-/// Merges collections `first` and `second`, removes duplicates and sorts the
-/// result, returning it as a [`Vec`].
-pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
- first: impl IntoIterator<Item = T>,
- second: impl IntoIterator<Item = S>,
-) -> Vec<usize> {
- let mut result: Vec<_> = first
- .into_iter()
- .map(|e| *e.borrow())
- .chain(second.into_iter().map(|e| *e.borrow()))
- .collect::<HashSet<_>>()
- .into_iter()
- .collect();
- result.sort();
- result
-}
-
-/// Checks whether the given index sequence is monotonically non-decreasing.
-pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) ->
bool {
- // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
- let mut previous = 0;
- for item in sequence.into_iter() {
- let current = *item.borrow();
- if current < previous {
- return false;
- }
- previous = current;
- }
- true
-}
-
-/// Calculates the set difference between sequences `first` and `second`,
-/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
-pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
- first: impl IntoIterator<Item = T>,
- second: impl IntoIterator<Item = S>,
-) -> Vec<usize> {
- let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
- first
- .into_iter()
- .map(|e| *e.borrow())
- .filter(|e| !set.contains(e))
- .collect()
-}
-
/// Checks whether the given operator is a limit;
/// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
@@ -227,53 +141,3 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) ->
Vec<String> {
let actual: Vec<&str> = formatted.trim().lines().collect();
actual.iter().map(|elem| elem.to_string()).collect()
}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[tokio::test]
- async fn test_find_indices() -> Result<()> {
- assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
- assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
- assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
- assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
- assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
- Ok(())
- }
-
- #[tokio::test]
- async fn test_merge_and_order_indices() {
- assert_eq!(
- merge_and_order_indices([0, 3, 4], [1, 3, 5]),
- vec![0, 1, 3, 4, 5]
- );
- // Result should be ordered, even if inputs are not
- assert_eq!(
- merge_and_order_indices([3, 0, 4], [5, 1, 3]),
- vec![0, 1, 3, 4, 5]
- );
- }
-
- #[tokio::test]
- async fn test_is_sorted() {
- assert!(is_sorted::<usize>([]));
- assert!(is_sorted([0]));
- assert!(is_sorted([0, 3, 4]));
- assert!(is_sorted([0, 1, 2]));
- assert!(is_sorted([0, 1, 4]));
- assert!(is_sorted([0usize; 0]));
- assert!(is_sorted([1, 2]));
- assert!(!is_sorted([3, 2]));
- }
-
- #[tokio::test]
- async fn test_set_difference() {
- assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
- assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
- // return value should have same ordering with the in1
- assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
- assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
- assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
- }
-}
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index c397fb5d3e..76adf7611d 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -330,6 +330,17 @@ pub fn execute_stream_partitioned(
Ok(streams)
}
+// Get output (un)boundedness information for the given `plan`.
+pub fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
+ let children_unbounded_output = plan
+ .children()
+ .iter()
+ .map(unbounded_output)
+ .collect::<Vec<_>>();
+ plan.unbounded_output(&children_unbounded_output)
+ .unwrap_or(true)
+}
+
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::WindowExpr;
pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c6211c8061..4108b42205 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -20,6 +20,13 @@
//! the input data seen so far), which makes it appropriate when processing
//! infinite inputs.
+use std::any::Any;
+use std::cmp::{min, Ordering};
+use std::collections::{HashMap, VecDeque};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use crate::windows::{
@@ -29,35 +36,20 @@ use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution,
ExecutionPlan,
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
WindowExpr,
};
-use datafusion_common::{exec_err, plan_err, Result};
-use datafusion_execution::TaskContext;
-use ahash::RandomState;
use arrow::{
array::{Array, ArrayRef, UInt32Builder},
compute::{concat, concat_batches, sort_to_indices},
datatypes::{Schema, SchemaBuilder, SchemaRef},
record_batch::RecordBatch,
};
-use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
-use futures::stream::Stream;
-use futures::{ready, StreamExt};
-use hashbrown::raw::RawTable;
-use indexmap::IndexMap;
-use log::debug;
-
-use std::any::Any;
-use std::cmp::{min, Ordering};
-use std::collections::{HashMap, VecDeque};
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-
use datafusion_common::utils::{
evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
get_record_batch_at_indices, get_row_at_idx,
};
-use datafusion_common::DataFusionError;
+use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
use datafusion_expr::ColumnarValue;
use datafusion_physical_expr::hash_utils::create_hashes;
use datafusion_physical_expr::window::{
@@ -68,6 +60,13 @@ use datafusion_physical_expr::{
PhysicalSortRequirement,
};
+use ahash::RandomState;
+use futures::stream::Stream;
+use futures::{ready, StreamExt};
+use hashbrown::raw::RawTable;
+use indexmap::IndexMap;
+use log::debug;
+
#[derive(Debug, Clone, PartialEq)]
/// Specifies partition column properties in terms of input ordering
pub enum PartitionSearchMode {
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 2a2f8d6d21..0f165f7935 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -17,43 +17,49 @@
//! Physical expressions for window functions
+use std::borrow::Borrow;
+use std::convert::TryInto;
+use std::sync::Arc;
+
use crate::{
aggregates,
expressions::{
cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal,
NthValue, Ntile,
PhysicalSortExpr, RowNumber,
},
- udaf, ExecutionPlan, PhysicalExpr,
+ udaf, unbounded_output, ExecutionPlan, PhysicalExpr,
};
+
use arrow::datatypes::Schema;
use arrow_schema::{DataType, Field, SchemaRef};
-use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::utils::{
+ find_indices, get_at_indices, is_sorted, longest_consecutive_prefix,
+ merge_and_order_indices, set_difference,
+};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::{
window_function::{BuiltInWindowFunction, WindowFunction},
PartitionEvaluator, WindowFrame, WindowUDF,
};
use datafusion_physical_expr::{
+ equivalence::OrderingEquivalenceBuilder,
+ utils::{convert_to_expr, get_indices_of_matching_exprs},
window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
- AggregateExpr,
+ AggregateExpr, OrderingEquivalenceProperties, PhysicalSortRequirement,
};
-use std::borrow::Borrow;
-use std::convert::TryInto;
-use std::sync::Arc;
+
+use itertools::{izip, Itertools};
mod bounded_window_agg_exec;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
pub use bounded_window_agg_exec::PartitionSearchMode;
-use datafusion_common::utils::longest_consecutive_prefix;
-use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
-use datafusion_physical_expr::utils::{convert_to_expr,
get_indices_of_matching_exprs};
+pub use window_agg_exec::WindowAggExec;
+
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
-use datafusion_physical_expr::{OrderingEquivalenceProperties,
PhysicalSortRequirement};
-pub use window_agg_exec::WindowAggExec;
/// Create a physical expression for window function
pub fn create_window_expr(
@@ -355,19 +361,266 @@ pub(crate) fn window_ordering_equivalence(
}
builder.build()
}
+
+/// Constructs the best-fitting windowing operator (a `WindowAggExec` or a
+/// `BoundedWindowExec`) for the given `input` according to the specifications
+/// of `window_exprs` and `physical_partition_keys`. Here, best-fitting means
+/// not requiring additional sorting and/or partitioning for the given input.
+/// - A return value of `None` represents that there is no way to construct a
+/// windowing operator that doesn't need additional sorting/partitioning for
+/// the given input. Existing ordering should be changed to run the given
+/// windowing operation.
+/// - A `Some(window exec)` value contains the optimal windowing operator (a
+/// `WindowAggExec` or a `BoundedWindowExec`) for the given input.
+pub fn get_best_fitting_window(
+ window_exprs: &[Arc<dyn WindowExpr>],
+ input: &Arc<dyn ExecutionPlan>,
+ // These are the partition keys used during repartitioning.
+ // They are either the same with `window_expr`'s PARTITION BY columns,
+ // or it is empty if partitioning is not desirable for this windowing
operator.
+ physical_partition_keys: &[Arc<dyn PhysicalExpr>],
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ // Contains at least one window expr and all of the partition by and order
by sections
+ // of the window_exprs are same.
+ let partitionby_exprs = window_exprs[0].partition_by();
+ let orderby_keys = window_exprs[0].order_by();
+ let (should_reverse, partition_search_mode) =
+ if let Some((should_reverse, partition_search_mode)) =
+ can_skip_sort(partitionby_exprs, orderby_keys, input)?
+ {
+ (should_reverse, partition_search_mode)
+ } else {
+ return Ok(None);
+ };
+ let is_unbounded = unbounded_output(input);
+ if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted {
+ // Executor has bounded input and `partition_search_mode` is not
`PartitionSearchMode::Sorted`
+ // in this case removing the sort is not helpful, return:
+ return Ok(None);
+ };
+
+ let window_expr = if should_reverse {
+ if let Some(reversed_window_expr) = window_exprs
+ .iter()
+ .map(|e| e.get_reverse_expr())
+ .collect::<Option<Vec<_>>>()
+ {
+ reversed_window_expr
+ } else {
+ // Cannot take reverse of any of the window expr
+ // In this case, with existing ordering window cannot be run
+ return Ok(None);
+ }
+ } else {
+ window_exprs.to_vec()
+ };
+
+ // If all window expressions can run with bounded memory, choose the
+ // bounded window variant:
+ if window_expr.iter().all(|e| e.uses_bounded_memory()) {
+ Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
+ window_expr,
+ input.clone(),
+ input.schema(),
+ physical_partition_keys.to_vec(),
+ partition_search_mode,
+ )?) as _))
+ } else if partition_search_mode != PartitionSearchMode::Sorted {
+ // For `WindowAggExec` to work correctly PARTITION BY columns should
be sorted.
+ // Hence, if `partition_search_mode` is not
`PartitionSearchMode::Sorted` we should convert
+ // input ordering such that it can work with
PartitionSearchMode::Sorted (add `SortExec`).
+ // Effectively `WindowAggExec` works only in
PartitionSearchMode::Sorted mode.
+ Ok(None)
+ } else {
+ Ok(Some(Arc::new(WindowAggExec::try_new(
+ window_expr,
+ input.clone(),
+ input.schema(),
+ physical_partition_keys.to_vec(),
+ )?) as _))
+ }
+}
+
+/// Compares physical ordering (output ordering of the `input` operator) with
+/// `partitionby_exprs` and `orderby_keys` to decide whether existing ordering
+/// is sufficient to run the current window operator.
+/// - A `None` return value indicates that we can not remove the sort in
question
+/// (input ordering is not sufficient to run current window executor).
+/// - A `Some((bool, PartitionSearchMode))` value indicates that the window
operator
+/// can run with existing input ordering, so we can remove `SortExec` before
it.
+/// The `bool` field in the return value represents whether we should reverse
window
+/// operator to remove `SortExec` before it. The `PartitionSearchMode` field
represents
+/// the mode this window operator should work in to accomodate the existing
ordering.
+fn can_skip_sort(
+ partitionby_exprs: &[Arc<dyn PhysicalExpr>],
+ orderby_keys: &[PhysicalSortExpr],
+ input: &Arc<dyn ExecutionPlan>,
+) -> Result<Option<(bool, PartitionSearchMode)>> {
+ let physical_ordering = if let Some(physical_ordering) =
input.output_ordering() {
+ physical_ordering
+ } else {
+ // If there is no physical ordering, there is no way to remove a
+ // sort, so immediately return.
+ return Ok(None);
+ };
+ let orderby_exprs = convert_to_expr(orderby_keys);
+ let physical_ordering_exprs = convert_to_expr(physical_ordering);
+ let equal_properties = || input.equivalence_properties();
+ // Get the indices of the ORDER BY expressions among input ordering
expressions:
+ let ob_indices = get_indices_of_matching_exprs(
+ &orderby_exprs,
+ &physical_ordering_exprs,
+ equal_properties,
+ );
+ if ob_indices.len() != orderby_exprs.len() {
+ // If all order by expressions are not in the input ordering,
+ // there is no way to remove a sort -- immediately return:
+ return Ok(None);
+ }
+ // Get the indices of the PARTITION BY expressions among input ordering
expressions:
+ let pb_indices = get_indices_of_matching_exprs(
+ partitionby_exprs,
+ &physical_ordering_exprs,
+ equal_properties,
+ );
+ let ordered_merged_indices = merge_and_order_indices(&pb_indices,
&ob_indices);
+ // Get the indices of the ORDER BY columns that don't appear in the
+ // PARTITION BY clause; i.e. calculate (ORDER BY columns) ∖ (PARTITION
+ // BY columns) where `∖` represents set difference.
+ let unique_ob_indices = set_difference(&ob_indices, &pb_indices);
+ if !is_sorted(&unique_ob_indices) {
+ // ORDER BY indices should be ascending ordered
+ return Ok(None);
+ }
+ let first_n = longest_consecutive_prefix(ordered_merged_indices);
+ let furthest_ob_index = *unique_ob_indices.last().unwrap_or(&0);
+ // Cannot skip sort if last order by index is not within consecutive
prefix.
+ // For instance, if input is ordered by a, b, c, d for the expression
+ // `PARTITION BY a, ORDER BY b, d`, then `first_n` would be 2 (meaning a,
b defines a
+ // prefix for input ordering). However, `furthest_ob_index` would be 3 as
column d
+ // occurs at the 3rd index of the existing ordering. Hence, existing
ordering would
+ // not be sufficient to run the current operator.
+ // However, for expression `PARTITION BY a, ORDER BY b, c, d`, `first_n`
would be 4 (meaning
+ // a, b, c, d defines a prefix for input ordering). Similarly,
`furthest_ob_index` would be
+ // 3 as column d occurs at the 3rd index of the existing ordering.
Therefore, the existing
+ // ordering would be sufficient to run the current operator.
+ if first_n <= furthest_ob_index {
+ return Ok(None);
+ }
+ let input_orderby_columns = get_at_indices(physical_ordering,
&unique_ob_indices)?;
+ let expected_orderby_columns =
+ get_at_indices(orderby_keys, find_indices(&ob_indices,
&unique_ob_indices)?)?;
+ let should_reverse = if let Some(should_reverse) = check_alignments(
+ &input.schema(),
+ &input_orderby_columns,
+ &expected_orderby_columns,
+ )? {
+ should_reverse
+ } else {
+ // If ordering directions are not aligned, we cannot calculate the
+ // result without changing existing ordering.
+ return Ok(None);
+ };
+
+ let ordered_pb_indices =
pb_indices.iter().copied().sorted().collect::<Vec<_>>();
+ // Determine how many elements in the PARTITION BY columns defines a
consecutive range from zero.
+ let first_n = longest_consecutive_prefix(&ordered_pb_indices);
+ let mode = if first_n == partitionby_exprs.len() {
+ // All of the PARTITION BY columns defines a consecutive range from
zero.
+ PartitionSearchMode::Sorted
+ } else if first_n > 0 {
+ // All of the PARTITION BY columns defines a consecutive range from
zero.
+ let ordered_range = &ordered_pb_indices[0..first_n];
+ let input_pb_exprs = get_at_indices(&physical_ordering_exprs,
ordered_range)?;
+ let partially_ordered_indices = get_indices_of_matching_exprs(
+ &input_pb_exprs,
+ partitionby_exprs,
+ equal_properties,
+ );
+ PartitionSearchMode::PartiallySorted(partially_ordered_indices)
+ } else {
+ // None of the PARTITION BY columns defines a consecutive range from
zero.
+ PartitionSearchMode::Linear
+ };
+
+ Ok(Some((should_reverse, mode)))
+}
+
+/// Compares all the orderings in `physical_ordering` and `required`, decides
+/// whether alignments match. A `None` return value indicates that current
+/// column is not aligned. A `Some(bool)` value indicates otherwise, and
signals
+/// whether we should reverse the window expression in order to avoid sorting.
+fn check_alignments(
+ schema: &SchemaRef,
+ physical_ordering: &[PhysicalSortExpr],
+ required: &[PhysicalSortExpr],
+) -> Result<Option<bool>> {
+ let result = izip!(physical_ordering, required)
+ .map(|(lhs, rhs)| check_alignment(schema, lhs, rhs))
+ .collect::<Result<Option<Vec<_>>>>()?;
+ Ok(if let Some(res) = result {
+ if !res.is_empty() {
+ let first = res[0];
+ let all_same = res.into_iter().all(|elem| elem == first);
+ all_same.then_some(first)
+ } else {
+ Some(false)
+ }
+ } else {
+ // Cannot skip some of the requirements in the input.
+ None
+ })
+}
+
+/// Compares `physical_ordering` and `required` ordering, decides whether
+/// alignments match. A `None` return value indicates that current column is
+/// not aligned. A `Some(bool)` value indicates otherwise, and signals whether
+/// we should reverse the window expression in order to avoid sorting.
+fn check_alignment(
+ input_schema: &SchemaRef,
+ physical_ordering: &PhysicalSortExpr,
+ required: &PhysicalSortExpr,
+) -> Result<Option<bool>> {
+ Ok(if required.expr.eq(&physical_ordering.expr) {
+ let physical_opts = physical_ordering.options;
+ let required_opts = required.options;
+ if required.expr.nullable(input_schema)? {
+ let reverse = physical_opts == !required_opts;
+ (reverse || physical_opts == required_opts).then_some(reverse)
+ } else {
+ // If the column is not nullable, NULLS FIRST/LAST is not
important.
+ Some(physical_opts.descending != required_opts.descending)
+ }
+ } else {
+ None
+ })
+}
+
#[cfg(test)]
mod tests {
use super::*;
use crate::aggregates::AggregateFunction;
use crate::collect;
use crate::expressions::col;
+ use crate::streaming::StreamingTableExec;
use crate::test::assert_is_pending;
use crate::test::exec::{assert_strong_count_converges_to_zero,
BlockingExec};
+ use crate::windows::PartitionSearchMode::{Linear, PartiallySorted, Sorted};
+
use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaRef};
use datafusion_execution::TaskContext;
+
use futures::FutureExt;
+ fn create_test_schema() -> Result<SchemaRef> {
+ let nullable_column = Field::new("nullable_col", DataType::Int32,
true);
+ let non_nullable_column = Field::new("non_nullable_col",
DataType::Int32, false);
+ let schema = Arc::new(Schema::new(vec![nullable_column,
non_nullable_column]));
+
+ Ok(schema)
+ }
+
fn create_test_schema2() -> Result<SchemaRef> {
let a = Field::new("a", DataType::Int32, true);
let b = Field::new("b", DataType::Int32, true);
@@ -378,6 +631,51 @@ mod tests {
Ok(schema)
}
+ // Generate a schema which consists of 5 columns (a, b, c, d, e)
+ fn create_test_schema3() -> Result<SchemaRef> {
+ let a = Field::new("a", DataType::Int32, true);
+ let b = Field::new("b", DataType::Int32, false);
+ let c = Field::new("c", DataType::Int32, true);
+ let d = Field::new("d", DataType::Int32, false);
+ let e = Field::new("e", DataType::Int32, false);
+ let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+ Ok(schema)
+ }
+
+ /// make PhysicalSortExpr with default options
+ pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+ sort_expr_options(name, schema, SortOptions::default())
+ }
+
+ /// PhysicalSortExpr with specified options
+ pub fn sort_expr_options(
+ name: &str,
+ schema: &Schema,
+ options: SortOptions,
+ ) -> PhysicalSortExpr {
+ PhysicalSortExpr {
+ expr: col(name, schema).unwrap(),
+ options,
+ }
+ }
+
+ /// Created a sorted Streaming Table exec
+ pub fn streaming_table_exec(
+ schema: &SchemaRef,
+ sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+ infinite_source: bool,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let sort_exprs = sort_exprs.into_iter().collect();
+
+ Ok(Arc::new(StreamingTableExec::try_new(
+ schema.clone(),
+ vec![],
+ None,
+ Some(sort_exprs),
+ infinite_source,
+ )?))
+ }
+
#[tokio::test]
async fn test_calc_requirements() -> Result<()> {
let schema = create_test_schema2()?;
@@ -474,4 +772,362 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_is_column_aligned_nullable() -> Result<()> {
+ let schema = create_test_schema()?;
+ let params = vec![
+ ((true, true), (false, false), Some(true)),
+ ((true, true), (false, true), None),
+ ((true, true), (true, false), None),
+ ((true, false), (false, true), Some(true)),
+ ((true, false), (false, false), None),
+ ((true, false), (true, true), None),
+ ];
+ for (
+ (physical_desc, physical_nulls_first),
+ (req_desc, req_nulls_first),
+ expected,
+ ) in params
+ {
+ let physical_ordering = PhysicalSortExpr {
+ expr: col("nullable_col", &schema)?,
+ options: SortOptions {
+ descending: physical_desc,
+ nulls_first: physical_nulls_first,
+ },
+ };
+ let required_ordering = PhysicalSortExpr {
+ expr: col("nullable_col", &schema)?,
+ options: SortOptions {
+ descending: req_desc,
+ nulls_first: req_nulls_first,
+ },
+ };
+ let res = check_alignment(&schema, &physical_ordering,
&required_ordering)?;
+ assert_eq!(res, expected);
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_is_column_aligned_non_nullable() -> Result<()> {
+ let schema = create_test_schema()?;
+
+ let params = vec![
+ ((true, true), (false, false), Some(true)),
+ ((true, true), (false, true), Some(true)),
+ ((true, true), (true, false), Some(false)),
+ ((true, false), (false, true), Some(true)),
+ ((true, false), (false, false), Some(true)),
+ ((true, false), (true, true), Some(false)),
+ ];
+ for (
+ (physical_desc, physical_nulls_first),
+ (req_desc, req_nulls_first),
+ expected,
+ ) in params
+ {
+ let physical_ordering = PhysicalSortExpr {
+ expr: col("non_nullable_col", &schema)?,
+ options: SortOptions {
+ descending: physical_desc,
+ nulls_first: physical_nulls_first,
+ },
+ };
+ let required_ordering = PhysicalSortExpr {
+ expr: col("non_nullable_col", &schema)?,
+ options: SortOptions {
+ descending: req_desc,
+ nulls_first: req_nulls_first,
+ },
+ };
+ let res = check_alignment(&schema, &physical_ordering,
&required_ordering)?;
+ assert_eq!(res, expected);
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_can_skip_ordering_exhaustive() -> Result<()> {
+ let test_schema = create_test_schema3()?;
+ // Columns a,c are nullable whereas b,d are not nullable.
+ // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC
NULLS FIRST, d ASC NULLS FIRST
+ // Column e is not ordered.
+ let sort_exprs = vec![
+ sort_expr("a", &test_schema),
+ sort_expr("b", &test_schema),
+ sort_expr("c", &test_schema),
+ sort_expr("d", &test_schema),
+ ];
+ let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs,
true)?;
+
+ // test cases consists of vector of tuples. Where each tuple
represents a single test case.
+ // First field in the tuple is Vec<str> where each element in the
vector represents PARTITION BY columns
+ // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
+ // Second field in the tuple is Vec<str> where each element in the
vector represents ORDER BY columns
+ // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST,
(ordering is default ordering. We do not check
+ // for reversibility in this test).
+ // Third field in the tuple is Option<PartitionSearchMode>, which
corresponds to expected algorithm mode.
+ // None represents that existing ordering is not sufficient to run
executor with any one of the algorithms
+ // (We need to add SortExec to be able to run it).
+ // Some(PartitionSearchMode) represents, we can run algorithm with
existing ordering; and algorithm should work in
+ // PartitionSearchMode.
+ let test_cases = vec![
+ (vec!["a"], vec!["a"], Some(Sorted)),
+ (vec!["a"], vec!["b"], Some(Sorted)),
+ (vec!["a"], vec!["c"], None),
+ (vec!["a"], vec!["a", "b"], Some(Sorted)),
+ (vec!["a"], vec!["b", "c"], Some(Sorted)),
+ (vec!["a"], vec!["a", "c"], None),
+ (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
+ (vec!["b"], vec!["a"], Some(Linear)),
+ (vec!["b"], vec!["b"], None),
+ (vec!["b"], vec!["c"], None),
+ (vec!["b"], vec!["a", "b"], Some(Linear)),
+ (vec!["b"], vec!["b", "c"], None),
+ (vec!["b"], vec!["a", "c"], Some(Linear)),
+ (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
+ (vec!["c"], vec!["a"], Some(Linear)),
+ (vec!["c"], vec!["b"], None),
+ (vec!["c"], vec!["c"], None),
+ (vec!["c"], vec!["a", "b"], Some(Linear)),
+ (vec!["c"], vec!["b", "c"], None),
+ (vec!["c"], vec!["a", "c"], Some(Linear)),
+ (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
+ (vec!["b", "a"], vec!["a"], Some(Sorted)),
+ (vec!["b", "a"], vec!["b"], Some(Sorted)),
+ (vec!["b", "a"], vec!["c"], Some(Sorted)),
+ (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
+ (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
+ (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
+ (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
+ (vec!["c", "b"], vec!["a"], Some(Linear)),
+ (vec!["c", "b"], vec!["b"], None),
+ (vec!["c", "b"], vec!["c"], None),
+ (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
+ (vec!["c", "b"], vec!["b", "c"], None),
+ (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
+ (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
+ (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
+ (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
+ (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
+ (
+ vec!["c", "a"],
+ vec!["a", "b"],
+ Some(PartiallySorted(vec![1])),
+ ),
+ (
+ vec!["c", "a"],
+ vec!["b", "c"],
+ Some(PartiallySorted(vec![1])),
+ ),
+ (
+ vec!["c", "a"],
+ vec!["a", "c"],
+ Some(PartiallySorted(vec![1])),
+ ),
+ (
+ vec!["c", "a"],
+ vec!["a", "b", "c"],
+ Some(PartiallySorted(vec![1])),
+ ),
+ (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
+ (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
+ (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
+ (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
+ (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
+ (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
+ (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
+ ];
+ for (case_idx, test_case) in test_cases.iter().enumerate() {
+ let (partition_by_columns, order_by_params, expected) = &test_case;
+ let mut partition_by_exprs = vec![];
+ for col_name in partition_by_columns {
+ partition_by_exprs.push(col(col_name, &test_schema)?);
+ }
+
+ let mut order_by_exprs = vec![];
+ for col_name in order_by_params {
+ let expr = col(col_name, &test_schema)?;
+ // Give default ordering, this is same with input ordering
direction
+ // In this test we do check for reversibility.
+ let options = SortOptions::default();
+ order_by_exprs.push(PhysicalSortExpr { expr, options });
+ }
+ let res =
+ can_skip_sort(&partition_by_exprs, &order_by_exprs,
&exec_unbounded)?;
+ // Since reversibility is not important in this test. Convert
Option<(bool, PartitionSearchMode)> to Option<PartitionSearchMode>
+ let res = res.map(|(_, mode)| mode);
+ assert_eq!(
+ res, *expected,
+ "Unexpected result for in unbounded test case#: {case_idx:?},
case: {test_case:?}"
+ );
+ }
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_can_skip_ordering() -> Result<()> {
+ let test_schema = create_test_schema3()?;
+ // Columns a,c are nullable whereas b,d are not nullable.
+ // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC
NULLS FIRST, d ASC NULLS FIRST
+ // Column e is not ordered.
+ let sort_exprs = vec![
+ sort_expr("a", &test_schema),
+ sort_expr("b", &test_schema),
+ sort_expr("c", &test_schema),
+ sort_expr("d", &test_schema),
+ ];
+ let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs,
true)?;
+
+ // test cases consists of vector of tuples. Where each tuple
represents a single test case.
+ // First field in the tuple is Vec<str> where each element in the
vector represents PARTITION BY columns
+ // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
+ // Second field in the tuple is Vec<(str, bool, bool)> where each
element in the vector represents ORDER BY columns
+ // For instance, vec![("c", false, false)], corresponds to ORDER BY c
ASC NULLS LAST,
+ // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC
NULLS FIRST,
+ // Third field in the tuple is Option<(bool, PartitionSearchMode)>,
which corresponds to expected result.
+ // None represents that existing ordering is not sufficient to run
executor with any one of the algorithms
+ // (We need to add SortExec to be able to run it).
+ // Some((bool, PartitionSearchMode)) represents, we can run algorithm
with existing ordering. Algorithm should work in
+ // PartitionSearchMode, bool field represents whether we should
reverse window expressions to run executor with existing ordering.
+ // For instance, `Some((false, PartitionSearchMode::Sorted))`,
represents that we shouldn't reverse window expressions. And algorithm
+ // should work in Sorted mode to work with existing ordering.
+ let test_cases = vec![
+ // PARTITION BY a, b ORDER BY c ASC NULLS LAST
+ (vec!["a", "b"], vec![("c", false, false)], None),
+ // ORDER BY c ASC NULLS FIRST
+ (vec![], vec![("c", false, true)], None),
+ // PARTITION BY b, ORDER BY c ASC NULLS FIRST
+ (vec!["b"], vec![("c", false, true)], None),
+ // PARTITION BY a, ORDER BY c ASC NULLS FIRST
+ (vec!["a"], vec![("c", false, true)], None),
+ // PARTITION BY b, ORDER BY c ASC NULLS FIRST
+ (
+ vec!["a", "b"],
+ vec![("c", false, true), ("e", false, true)],
+ None,
+ ),
+ // PARTITION BY a, ORDER BY b ASC NULLS FIRST
+ (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
+ // PARTITION BY a, ORDER BY a ASC NULLS FIRST
+ (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
+ // PARTITION BY a, ORDER BY a ASC NULLS LAST
+ (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
+ // PARTITION BY a, ORDER BY a DESC NULLS FIRST
+ (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
+ // PARTITION BY a, ORDER BY a DESC NULLS LAST
+ (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
+ // PARTITION BY a, ORDER BY b ASC NULLS LAST
+ (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
+ // PARTITION BY a, ORDER BY b DESC NULLS LAST
+ (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
+ // PARTITION BY a, b ORDER BY c ASC NULLS FIRST
+ (
+ vec!["a", "b"],
+ vec![("c", false, true)],
+ Some((false, Sorted)),
+ ),
+ // PARTITION BY b, a ORDER BY c ASC NULLS FIRST
+ (
+ vec!["b", "a"],
+ vec![("c", false, true)],
+ Some((false, Sorted)),
+ ),
+ // PARTITION BY a, b ORDER BY c DESC NULLS LAST
+ (
+ vec!["a", "b"],
+ vec![("c", true, false)],
+ Some((true, Sorted)),
+ ),
+ // PARTITION BY e ORDER BY a ASC NULLS FIRST
+ (
+ vec!["e"],
+ vec![("a", false, true)],
+ // For unbounded, expects to work in Linear mode. Shouldn't
reverse window function.
+ Some((false, Linear)),
+ ),
+ // PARTITION BY b, c ORDER BY a ASC NULLS FIRST, c ASC NULLS FIRST
+ (
+ vec!["b", "c"],
+ vec![("a", false, true), ("c", false, true)],
+ Some((false, Linear)),
+ ),
+ // PARTITION BY b ORDER BY a ASC NULLS FIRST
+ (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
+ // PARTITION BY a, e ORDER BY b ASC NULLS FIRST
+ (
+ vec!["a", "e"],
+ vec![("b", false, true)],
+ Some((false, PartiallySorted(vec![0]))),
+ ),
+ // PARTITION BY a, c ORDER BY b ASC NULLS FIRST
+ (
+ vec!["a", "c"],
+ vec![("b", false, true)],
+ Some((false, PartiallySorted(vec![0]))),
+ ),
+ // PARTITION BY c, a ORDER BY b ASC NULLS FIRST
+ (
+ vec!["c", "a"],
+ vec![("b", false, true)],
+ Some((false, PartiallySorted(vec![1]))),
+ ),
+ // PARTITION BY d, b, a ORDER BY c ASC NULLS FIRST
+ (
+ vec!["d", "b", "a"],
+ vec![("c", false, true)],
+ Some((false, PartiallySorted(vec![2, 1]))),
+ ),
+ // PARTITION BY e, b, a ORDER BY c ASC NULLS FIRST
+ (
+ vec!["e", "b", "a"],
+ vec![("c", false, true)],
+ Some((false, PartiallySorted(vec![2, 1]))),
+ ),
+ // PARTITION BY d, a ORDER BY b ASC NULLS FIRST
+ (
+ vec!["d", "a"],
+ vec![("b", false, true)],
+ Some((false, PartiallySorted(vec![1]))),
+ ),
+ // PARTITION BY b, ORDER BY b, a ASC NULLS FIRST
+ (
+ vec!["a"],
+ vec![("b", false, true), ("a", false, true)],
+ Some((false, Sorted)),
+ ),
+ // ORDER BY b, a ASC NULLS FIRST
+ (vec![], vec![("b", false, true), ("a", false, true)], None),
+ ];
+ for (case_idx, test_case) in test_cases.iter().enumerate() {
+ let (partition_by_columns, order_by_params, expected) = &test_case;
+ let mut partition_by_exprs = vec![];
+ for col_name in partition_by_columns {
+ partition_by_exprs.push(col(col_name, &test_schema)?);
+ }
+
+ let mut order_by_exprs = vec![];
+ for (col_name, descending, nulls_first) in order_by_params {
+ let expr = col(col_name, &test_schema)?;
+ let options = SortOptions {
+ descending: *descending,
+ nulls_first: *nulls_first,
+ };
+ order_by_exprs.push(PhysicalSortExpr { expr, options });
+ }
+
+ assert_eq!(
+ can_skip_sort(&partition_by_exprs, &order_by_exprs,
&exec_unbounded)?,
+ *expected,
+ "Unexpected result for in unbounded test case#: {case_idx:?},
case: {test_case:?}"
+ );
+ }
+
+ Ok(())
+ }
}
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index da43f127f0..b56a9c194c 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -17,6 +17,11 @@
//! Stream and channel implementations for window function expressions.
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
use crate::common::transpose;
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
@@ -28,6 +33,7 @@ use crate::{
ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
SendableRecordBatchStream, Statistics, WindowExpr,
};
+
use arrow::compute::{concat, concat_batches};
use arrow::datatypes::SchemaBuilder;
use arrow::error::ArrowError;
@@ -37,16 +43,12 @@ use arrow::{
record_batch::RecordBatch,
};
use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
-use datafusion_common::Result;
-use datafusion_common::{internal_err, plan_err, DataFusionError};
+use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{OrderingEquivalenceProperties,
PhysicalSortRequirement};
+
use futures::stream::Stream;
use futures::{ready, StreamExt};
-use std::any::Any;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll};
/// Window execution plan
#[derive(Debug)]