This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 26a36024c2 Move window analysis to the window method (#7672)
26a36024c2 is described below

commit 26a36024c27c5dacdf7ec2d530a5de7673a41f5f
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Sep 28 20:24:48 2023 +0300

    Move window analysis to the window method (#7672)
    
    * Move window analysis to the method
    
    * Preserve physical partition keys, during recreation of the window
    
    * Final review
    
    * Avoid cloning if not necessary
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/common/src/utils.rs                     | 104 ++++
 .../src/physical_optimizer/enforce_distribution.rs |   7 +-
 .../core/src/physical_optimizer/enforce_sorting.rs | 661 ++------------------
 .../replace_with_order_preserving_variants.rs      |   9 +-
 datafusion/core/src/physical_optimizer/utils.rs    | 136 -----
 datafusion/physical-plan/src/lib.rs                |  11 +
 .../src/windows/bounded_window_agg_exec.rs         |  35 +-
 datafusion/physical-plan/src/windows/mod.rs        | 680 ++++++++++++++++++++-
 .../physical-plan/src/windows/window_agg_exec.rs   |  14 +-
 9 files changed, 873 insertions(+), 784 deletions(-)

diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 6141723b06..b7c80aa9ac 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -28,6 +28,7 @@ use sqlparser::dialect::GenericDialect;
 use sqlparser::parser::Parser;
 use std::borrow::{Borrow, Cow};
 use std::cmp::Ordering;
+use std::collections::HashSet;
 use std::ops::Range;
 use std::sync::Arc;
 
@@ -429,6 +430,64 @@ pub mod datafusion_strsim {
     }
 }
 
+/// Merges collections `first` and `second`, removes duplicates and sorts the
+/// result, returning it as a [`Vec`].
+pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
+    first: impl IntoIterator<Item = T>,
+    second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+    let mut result: Vec<_> = first
+        .into_iter()
+        .map(|e| *e.borrow())
+        .chain(second.into_iter().map(|e| *e.borrow()))
+        .collect::<HashSet<_>>()
+        .into_iter()
+        .collect();
+    result.sort();
+    result
+}
+
+/// Calculates the set difference between sequences `first` and `second`,
+/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
+pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
+    first: impl IntoIterator<Item = T>,
+    second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
+    first
+        .into_iter()
+        .map(|e| *e.borrow())
+        .filter(|e| !set.contains(e))
+        .collect()
+}
+
+/// Checks whether the given index sequence is monotonically non-decreasing.
+pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) -> 
bool {
+    // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
+    let mut previous = 0;
+    for item in sequence.into_iter() {
+        let current = *item.borrow();
+        if current < previous {
+            return false;
+        }
+        previous = current;
+    }
+    true
+}
+
+/// Find indices of each element in `targets` inside `items`. If one of the
+/// elements is absent in `items`, returns an error.
+pub fn find_indices<T: PartialEq, S: Borrow<T>>(
+    items: &[T],
+    targets: impl IntoIterator<Item = S>,
+) -> Result<Vec<usize>> {
+    targets
+        .into_iter()
+        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
+        .collect::<Option<_>>()
+        .ok_or_else(|| DataFusionError::Execution("Target not 
found".to_string()))
+}
+
 #[cfg(test)]
 mod tests {
     use crate::ScalarValue;
@@ -747,4 +806,49 @@ mod tests {
             "cloned `Arc` should point to same data as the original"
         );
     }
+
+    #[test]
+    fn test_merge_and_order_indices() {
+        assert_eq!(
+            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+            vec![0, 1, 3, 4, 5]
+        );
+        // Result should be ordered, even if inputs are not
+        assert_eq!(
+            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+            vec![0, 1, 3, 4, 5]
+        );
+    }
+
+    #[test]
+    fn test_set_difference() {
+        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
+        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
+        // return value should have same ordering with the in1
+        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
+        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
+        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
+    }
+
+    #[test]
+    fn test_is_sorted() {
+        assert!(is_sorted::<usize>([]));
+        assert!(is_sorted([0]));
+        assert!(is_sorted([0, 3, 4]));
+        assert!(is_sorted([0, 1, 2]));
+        assert!(is_sorted([0, 1, 4]));
+        assert!(is_sorted([0usize; 0]));
+        assert!(is_sorted([1, 2]));
+        assert!(!is_sorted([3, 2]));
+    }
+
+    #[test]
+    fn test_find_indices() -> Result<()> {
+        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+        Ok(())
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index fea7f8c5d8..b2a1a03383 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -28,9 +28,7 @@ use std::sync::Arc;
 use crate::config::ConfigOptions;
 use crate::datasource::physical_plan::{CsvExec, ParquetExec};
 use crate::error::{DataFusionError, Result};
-use crate::physical_optimizer::utils::{
-    add_sort_above, get_plan_string, unbounded_output, ExecTree,
-};
+use crate::physical_optimizer::utils::{add_sort_above, get_plan_string, 
ExecTree};
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, 
PhysicalGroupBy};
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -46,6 +44,7 @@ use crate::physical_plan::windows::WindowAggExec;
 use crate::physical_plan::Partitioning;
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
 
+use datafusion_common::internal_err;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_expr::logical_plan::JoinType;
 use datafusion_physical_expr::equivalence::EquivalenceProperties;
@@ -56,8 +55,8 @@ use datafusion_physical_expr::utils::{
 use datafusion_physical_expr::{
     expr_list_eq_strict_order, PhysicalExpr, PhysicalSortRequirement,
 };
+use datafusion_physical_plan::unbounded_output;
 
-use datafusion_common::internal_err;
 use itertools::izip;
 
 /// The `EnforceDistribution` rule ensures that distribution requirements are
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index b6f2adac1b..a149330181 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -43,30 +43,26 @@ use 
crate::physical_optimizer::replace_with_order_preserving_variants::{
 };
 use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
 use crate::physical_optimizer::utils::{
-    add_sort_above, find_indices, is_coalesce_partitions, is_limit, 
is_repartition,
-    is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
-    merge_and_order_indices, set_difference, unbounded_output, ExecTree,
+    add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
+    is_sort_preserving_merge, is_union, is_window, ExecTree,
 };
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::sorts::sort::SortExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::windows::{
-    BoundedWindowAggExec, PartitionSearchMode, WindowAggExec,
+    get_best_fitting_window, BoundedWindowAggExec, PartitionSearchMode, 
WindowAggExec,
 };
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
 
-use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
-use datafusion_common::utils::{get_at_indices, longest_consecutive_prefix};
 use datafusion_common::{plan_err, DataFusionError};
 use datafusion_physical_expr::utils::{
-    convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy,
-    ordering_satisfy_requirement_concrete,
+    ordering_satisfy, ordering_satisfy_requirement_concrete,
 };
-use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement};
+use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
 
-use itertools::{izip, Itertools};
+use itertools::izip;
 
 /// This rule inspects [`SortExec`]'s in the given physical plan and removes 
the
 /// ones it can prove unnecessary.
@@ -564,106 +560,73 @@ fn analyze_window_sort_removal(
     sort_tree: &mut ExecTree,
     window_exec: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<PlanWithCorrespondingSort>> {
-    let (window_expr, partition_keys) =
+    let requires_single_partition = matches!(
+        window_exec.required_input_distribution()[sort_tree.idx],
+        Distribution::SinglePartition
+    );
+    let mut window_child =
+        remove_corresponding_sort_from_sub_plan(sort_tree, 
requires_single_partition)?;
+
+    let (window_expr, new_window) =
         if let Some(exec) = 
window_exec.as_any().downcast_ref::<BoundedWindowAggExec>() {
-            (exec.window_expr(), &exec.partition_keys)
+            (
+                exec.window_expr(),
+                get_best_fitting_window(
+                    exec.window_expr(),
+                    &window_child,
+                    &exec.partition_keys,
+                )?,
+            )
         } else if let Some(exec) = 
window_exec.as_any().downcast_ref::<WindowAggExec>() {
-            (exec.window_expr(), &exec.partition_keys)
+            (
+                exec.window_expr(),
+                get_best_fitting_window(
+                    exec.window_expr(),
+                    &window_child,
+                    &exec.partition_keys,
+                )?,
+            )
         } else {
             return plan_err!(
                 "Expects to receive either WindowAggExec of 
BoundedWindowAggExec"
             );
         };
     let partitionby_exprs = window_expr[0].partition_by();
-    let orderby_sort_keys = window_expr[0].order_by();
-
-    // search_flags stores return value of the can_skip_sort.
-    // `None` case represents `SortExec` cannot be removed.
-    // `PartitionSearch` mode stores at which mode executor should work to 
remove
-    // `SortExec` before it,
-    // `bool` stores whether or not we need to reverse window expressions to 
remove `SortExec`.
-    let mut search_flags = None;
-    for sort_any in sort_tree.get_leaves() {
-        // Variable `sort_any` will either be a `SortExec` or a
-        // `SortPreservingMergeExec`, and both have a single child.
-        // Therefore, we can use the 0th index without loss of generality.
-        let sort_input = &sort_any.children()[0];
-        let flags = can_skip_sort(partitionby_exprs, orderby_sort_keys, 
sort_input)?;
-        if flags.is_some() && (search_flags.is_none() || search_flags == 
flags) {
-            search_flags = flags;
-            continue;
-        }
-        // We can not skip the sort, or window reversal requirements are not
-        // uniform; then sort removal is not possible -- we immediately return.
-        return Ok(None);
-    }
-    let (should_reverse, partition_search_mode) = if let Some(search_flags) = 
search_flags
-    {
-        search_flags
-    } else {
-        // We can not skip the sort return:
-        return Ok(None);
-    };
-    let is_unbounded = unbounded_output(window_exec);
-    if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted {
-        // Executor has bounded input and `partition_search_mode` is not 
`PartitionSearchMode::Sorted`
-        // in this case removing the sort is not helpful, return:
-        return Ok(None);
-    };
 
-    let new_window_expr = if should_reverse {
-        window_expr
-            .iter()
-            .map(|e| e.get_reverse_expr())
-            .collect::<Option<Vec<_>>>()
+    if let Some(new_window) = new_window {
+        // We were able to change the window to accommodate the input, use it:
+        Ok(Some(PlanWithCorrespondingSort::new(new_window)))
     } else {
-        Some(window_expr.to_vec())
-    };
-    if let Some(window_expr) = new_window_expr {
-        let requires_single_partition = matches!(
-            window_exec.required_input_distribution()[sort_tree.idx],
-            Distribution::SinglePartition
-        );
-        let mut new_child = remove_corresponding_sort_from_sub_plan(
-            sort_tree,
-            requires_single_partition,
-        )?;
-        let new_schema = new_child.schema();
+        // We were unable to change the window to accommodate the input, so we
+        // will insert a sort.
+        let reqs = window_exec
+            .required_input_ordering()
+            .swap_remove(0)
+            .unwrap_or(vec![]);
+        let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
+        // Satisfy the ordering requirement so that the window can run:
+        add_sort_above(&mut window_child, sort_expr, None)?;
 
         let uses_bounded_memory = window_expr.iter().all(|e| 
e.uses_bounded_memory());
-        // If all window expressions can run with bounded memory, choose the
-        // bounded window variant:
-        let new_plan = if uses_bounded_memory {
+        let input_schema = window_child.schema();
+        let new_window = if uses_bounded_memory {
             Arc::new(BoundedWindowAggExec::try_new(
-                window_expr,
-                new_child,
-                new_schema,
-                partition_keys.to_vec(),
-                partition_search_mode,
+                window_expr.to_vec(),
+                window_child,
+                input_schema,
+                partitionby_exprs.to_vec(),
+                PartitionSearchMode::Sorted,
             )?) as _
         } else {
-            if partition_search_mode != PartitionSearchMode::Sorted {
-                // For `WindowAggExec` to work correctly PARTITION BY columns 
should be sorted.
-                // Hence, if `partition_search_mode` is not 
`PartitionSearchMode::Sorted` we should convert
-                // input ordering such that it can work with 
PartitionSearchMode::Sorted (add `SortExec`).
-                // Effectively `WindowAggExec` works only in 
PartitionSearchMode::Sorted mode.
-                let reqs = window_exec
-                    .required_input_ordering()
-                    .swap_remove(0)
-                    .unwrap_or(vec![]);
-                let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs);
-                add_sort_above(&mut new_child, sort_expr, None)?;
-            };
             Arc::new(WindowAggExec::try_new(
-                window_expr,
-                new_child,
-                new_schema,
-                partition_keys.to_vec(),
+                window_expr.to_vec(),
+                window_child,
+                input_schema,
+                partitionby_exprs.to_vec(),
             )?) as _
         };
-        return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
+        Ok(Some(PlanWithCorrespondingSort::new(new_window)))
     }
-    Ok(None)
 }
 
 /// Updates child to remove the unnecessary [`CoalescePartitionsExec`] below 
it.
@@ -784,155 +747,12 @@ fn get_sort_exprs(
     }
 }
 
-/// Compares physical ordering (output ordering of input executor) with
-/// `partitionby_exprs` and `orderby_keys`
-/// to decide whether existing ordering is sufficient to run current window 
executor.
-/// A `None` return value indicates that we can not remove the sort in 
question (input ordering is not
-/// sufficient to run current window executor).
-/// A `Some((bool, PartitionSearchMode))` value indicates window executor can 
be run with existing input ordering
-/// (Hence we can remove [`SortExec`] before it).
-/// `bool` represents whether we should reverse window executor to remove 
[`SortExec`] before it.
-/// `PartitionSearchMode` represents, in which mode Window Executor should 
work with existing ordering.
-fn can_skip_sort(
-    partitionby_exprs: &[Arc<dyn PhysicalExpr>],
-    orderby_keys: &[PhysicalSortExpr],
-    input: &Arc<dyn ExecutionPlan>,
-) -> Result<Option<(bool, PartitionSearchMode)>> {
-    let physical_ordering = if let Some(physical_ordering) = 
input.output_ordering() {
-        physical_ordering
-    } else {
-        // If there is no physical ordering, there is no way to remove a
-        // sort, so immediately return.
-        return Ok(None);
-    };
-    let orderby_exprs = convert_to_expr(orderby_keys);
-    let physical_ordering_exprs = convert_to_expr(physical_ordering);
-    let equal_properties = || input.equivalence_properties();
-    // indices of the order by expressions among input ordering expressions
-    let ob_indices = get_indices_of_matching_exprs(
-        &orderby_exprs,
-        &physical_ordering_exprs,
-        equal_properties,
-    );
-    if ob_indices.len() != orderby_exprs.len() {
-        // If all order by expressions are not in the input ordering,
-        // there is no way to remove a sort -- immediately return:
-        return Ok(None);
-    }
-    // indices of the partition by expressions among input ordering expressions
-    let pb_indices = get_indices_of_matching_exprs(
-        partitionby_exprs,
-        &physical_ordering_exprs,
-        equal_properties,
-    );
-    let ordered_merged_indices = merge_and_order_indices(&pb_indices, 
&ob_indices);
-    // Indices of order by columns that doesn't seen in partition by
-    // Equivalently (Order by columns) ∖ (Partition by columns) where `∖` 
represents set difference.
-    let unique_ob_indices = set_difference(&ob_indices, &pb_indices);
-    if !is_sorted(&unique_ob_indices) {
-        // ORDER BY indices should be ascending ordered
-        return Ok(None);
-    }
-    let first_n = longest_consecutive_prefix(ordered_merged_indices);
-    let furthest_ob_index = *unique_ob_indices.last().unwrap_or(&0);
-    // Cannot skip sort if last order by index is not within consecutive 
prefix.
-    // For instance, if input is ordered by a,b,c,d
-    // for expression `PARTITION BY a, ORDER BY b, d`, `first_n` would be 2 
(meaning a, b defines a prefix for input ordering)
-    // Whereas `furthest_ob_index` would be 3 (column d occurs at the 3rd 
index of the existing ordering.)
-    // Hence existing ordering is not sufficient to run current Executor.
-    // However, for expression `PARTITION BY a, ORDER BY b, c, d`, `first_n` 
would be 4 (meaning a, b, c, d defines a prefix for input ordering)
-    // Similarly, `furthest_ob_index` would be 3 (column d occurs at the 3rd 
index of the existing ordering.)
-    // Hence existing ordering would be sufficient to run current Executor.
-    if first_n <= furthest_ob_index {
-        return Ok(None);
-    }
-    let input_orderby_columns = get_at_indices(physical_ordering, 
&unique_ob_indices)?;
-    let expected_orderby_columns =
-        get_at_indices(orderby_keys, find_indices(&ob_indices, 
&unique_ob_indices)?)?;
-    let should_reverse = if let Some(should_reverse) = check_alignments(
-        &input.schema(),
-        &input_orderby_columns,
-        &expected_orderby_columns,
-    )? {
-        should_reverse
-    } else {
-        // If ordering directions are not aligned. We cannot calculate result 
without changing existing ordering.
-        return Ok(None);
-    };
-
-    let ordered_pb_indices = 
pb_indices.iter().copied().sorted().collect::<Vec<_>>();
-    // Determine how many elements in the partition by columns defines a 
consecutive range from zero.
-    let first_n = longest_consecutive_prefix(&ordered_pb_indices);
-    let mode = if first_n == partitionby_exprs.len() {
-        // All of the partition by columns defines a consecutive range from 
zero.
-        PartitionSearchMode::Sorted
-    } else if first_n > 0 {
-        // All of the partition by columns defines a consecutive range from 
zero.
-        let ordered_range = &ordered_pb_indices[0..first_n];
-        let input_pb_exprs = get_at_indices(&physical_ordering_exprs, 
ordered_range)?;
-        let partially_ordered_indices = get_indices_of_matching_exprs(
-            &input_pb_exprs,
-            partitionby_exprs,
-            equal_properties,
-        );
-        PartitionSearchMode::PartiallySorted(partially_ordered_indices)
-    } else {
-        // None of the partition by columns defines a consecutive range from 
zero.
-        PartitionSearchMode::Linear
-    };
-
-    Ok(Some((should_reverse, mode)))
-}
-
-fn check_alignments(
-    schema: &SchemaRef,
-    physical_ordering: &[PhysicalSortExpr],
-    required: &[PhysicalSortExpr],
-) -> Result<Option<bool>> {
-    let res = izip!(physical_ordering, required)
-        .map(|(lhs, rhs)| check_alignment(schema, lhs, rhs))
-        .collect::<Result<Option<Vec<_>>>>()?;
-    Ok(if let Some(res) = res {
-        if !res.is_empty() {
-            let first = res[0];
-            let all_same = res.into_iter().all(|elem| elem == first);
-            all_same.then_some(first)
-        } else {
-            Some(false)
-        }
-    } else {
-        // Cannot skip some of the requirements in the input.
-        None
-    })
-}
-
-/// Compares `physical_ordering` and `required` ordering, decides whether
-/// alignments match. A `None` return value indicates that current column is
-/// not aligned. A `Some(bool)` value indicates otherwise, and signals whether
-/// we should reverse the window expression in order to avoid sorting.
-fn check_alignment(
-    input_schema: &SchemaRef,
-    physical_ordering: &PhysicalSortExpr,
-    required: &PhysicalSortExpr,
-) -> Result<Option<bool>> {
-    Ok(if required.expr.eq(&physical_ordering.expr) {
-        let physical_opts = physical_ordering.options;
-        let required_opts = required.options;
-        if required.expr.nullable(input_schema)? {
-            let reverse = physical_opts == !required_opts;
-            (reverse || physical_opts == required_opts).then_some(reverse)
-        } else {
-            // If the column is not nullable, NULLS FIRST/LAST is not 
important.
-            Some(physical_opts.descending != required_opts.descending)
-        }
-    } else {
-        None
-    })
-}
-
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use super::*;
+    use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
     use crate::physical_optimizer::test_utils::{
         aggregate_exec, bounded_window_exec, coalesce_batches_exec,
         coalesce_partitions_exec, filter_exec, global_limit_exec, 
hash_join_exec,
@@ -940,10 +760,8 @@ mod tests {
         repartition_exec, sort_exec, sort_expr, sort_expr_options, 
sort_merge_join_exec,
         sort_preserving_merge_exec, union_exec,
     };
+    use crate::physical_optimizer::utils::get_plan_string;
     use crate::physical_plan::repartition::RepartitionExec;
-    use crate::physical_plan::windows::PartitionSearchMode::{
-        Linear, PartiallySorted, Sorted,
-    };
     use crate::physical_plan::{displayable, Partitioning};
     use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::csv_exec_sorted;
@@ -954,11 +772,6 @@ mod tests {
     use datafusion_expr::JoinType;
     use datafusion_physical_expr::expressions::Column;
     use datafusion_physical_expr::expressions::{col, NotExpr};
-    use datafusion_physical_expr::PhysicalSortExpr;
-
-    use crate::physical_optimizer::enforce_distribution::EnforceDistribution;
-    use crate::physical_optimizer::utils::get_plan_string;
-    use std::sync::Arc;
 
     fn create_test_schema() -> Result<SchemaRef> {
         let nullable_column = Field::new("nullable_col", DataType::Int32, 
true);
@@ -986,364 +799,6 @@ mod tests {
         Ok(schema)
     }
 
-    #[tokio::test]
-    async fn test_is_column_aligned_nullable() -> Result<()> {
-        let schema = create_test_schema()?;
-        let params = vec![
-            ((true, true), (false, false), Some(true)),
-            ((true, true), (false, true), None),
-            ((true, true), (true, false), None),
-            ((true, false), (false, true), Some(true)),
-            ((true, false), (false, false), None),
-            ((true, false), (true, true), None),
-        ];
-        for (
-            (physical_desc, physical_nulls_first),
-            (req_desc, req_nulls_first),
-            expected,
-        ) in params
-        {
-            let physical_ordering = PhysicalSortExpr {
-                expr: col("nullable_col", &schema)?,
-                options: SortOptions {
-                    descending: physical_desc,
-                    nulls_first: physical_nulls_first,
-                },
-            };
-            let required_ordering = PhysicalSortExpr {
-                expr: col("nullable_col", &schema)?,
-                options: SortOptions {
-                    descending: req_desc,
-                    nulls_first: req_nulls_first,
-                },
-            };
-            let res = check_alignment(&schema, &physical_ordering, 
&required_ordering)?;
-            assert_eq!(res, expected);
-        }
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_is_column_aligned_non_nullable() -> Result<()> {
-        let schema = create_test_schema()?;
-
-        let params = vec![
-            ((true, true), (false, false), Some(true)),
-            ((true, true), (false, true), Some(true)),
-            ((true, true), (true, false), Some(false)),
-            ((true, false), (false, true), Some(true)),
-            ((true, false), (false, false), Some(true)),
-            ((true, false), (true, true), Some(false)),
-        ];
-        for (
-            (physical_desc, physical_nulls_first),
-            (req_desc, req_nulls_first),
-            expected,
-        ) in params
-        {
-            let physical_ordering = PhysicalSortExpr {
-                expr: col("non_nullable_col", &schema)?,
-                options: SortOptions {
-                    descending: physical_desc,
-                    nulls_first: physical_nulls_first,
-                },
-            };
-            let required_ordering = PhysicalSortExpr {
-                expr: col("non_nullable_col", &schema)?,
-                options: SortOptions {
-                    descending: req_desc,
-                    nulls_first: req_nulls_first,
-                },
-            };
-            let res = check_alignment(&schema, &physical_ordering, 
&required_ordering)?;
-            assert_eq!(res, expected);
-        }
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_can_skip_ordering_exhaustive() -> Result<()> {
-        let test_schema = create_test_schema3()?;
-        // Columns a,c are nullable whereas b,d are not nullable.
-        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST, d ASC NULLS FIRST
-        // Column e is not ordered.
-        let sort_exprs = vec![
-            sort_expr("a", &test_schema),
-            sort_expr("b", &test_schema),
-            sort_expr("c", &test_schema),
-            sort_expr("d", &test_schema),
-        ];
-        let exec_unbounded = csv_exec_sorted(&test_schema, sort_exprs, true);
-
-        // test cases consists of vector of tuples. Where each tuple 
represents a single test case.
-        // First field in the tuple is Vec<str> where each element in the 
vector represents PARTITION BY columns
-        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
-        // Second field in the tuple is Vec<str> where each element in the 
vector represents ORDER BY columns
-        // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST, 
(ordering is default ordering. We do not check
-        // for reversibility in this test).
-        // Third field in the tuple is Option<PartitionSearchMode>, which 
corresponds to expected algorithm mode.
-        // None represents that existing ordering is not sufficient to run 
executor with any one of the algorithms
-        // (We need to add SortExec to be able to run it).
-        // Some(PartitionSearchMode) represents, we can run algorithm with 
existing ordering; and algorithm should work in
-        // PartitionSearchMode.
-        let test_cases = vec![
-            (vec!["a"], vec!["a"], Some(Sorted)),
-            (vec!["a"], vec!["b"], Some(Sorted)),
-            (vec!["a"], vec!["c"], None),
-            (vec!["a"], vec!["a", "b"], Some(Sorted)),
-            (vec!["a"], vec!["b", "c"], Some(Sorted)),
-            (vec!["a"], vec!["a", "c"], None),
-            (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
-            (vec!["b"], vec!["a"], Some(Linear)),
-            (vec!["b"], vec!["b"], None),
-            (vec!["b"], vec!["c"], None),
-            (vec!["b"], vec!["a", "b"], Some(Linear)),
-            (vec!["b"], vec!["b", "c"], None),
-            (vec!["b"], vec!["a", "c"], Some(Linear)),
-            (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
-            (vec!["c"], vec!["a"], Some(Linear)),
-            (vec!["c"], vec!["b"], None),
-            (vec!["c"], vec!["c"], None),
-            (vec!["c"], vec!["a", "b"], Some(Linear)),
-            (vec!["c"], vec!["b", "c"], None),
-            (vec!["c"], vec!["a", "c"], Some(Linear)),
-            (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
-            (vec!["b", "a"], vec!["a"], Some(Sorted)),
-            (vec!["b", "a"], vec!["b"], Some(Sorted)),
-            (vec!["b", "a"], vec!["c"], Some(Sorted)),
-            (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
-            (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
-            (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
-            (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
-            (vec!["c", "b"], vec!["a"], Some(Linear)),
-            (vec!["c", "b"], vec!["b"], None),
-            (vec!["c", "b"], vec!["c"], None),
-            (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
-            (vec!["c", "b"], vec!["b", "c"], None),
-            (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
-            (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
-            (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
-            (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
-            (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
-            (
-                vec!["c", "a"],
-                vec!["a", "b"],
-                Some(PartiallySorted(vec![1])),
-            ),
-            (
-                vec!["c", "a"],
-                vec!["b", "c"],
-                Some(PartiallySorted(vec![1])),
-            ),
-            (
-                vec!["c", "a"],
-                vec!["a", "c"],
-                Some(PartiallySorted(vec![1])),
-            ),
-            (
-                vec!["c", "a"],
-                vec!["a", "b", "c"],
-                Some(PartiallySorted(vec![1])),
-            ),
-            (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
-            (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
-            (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
-            (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
-            (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
-            (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
-            (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
-        ];
-        for (case_idx, test_case) in test_cases.iter().enumerate() {
-            let (partition_by_columns, order_by_params, expected) = &test_case;
-            let mut partition_by_exprs = vec![];
-            for col_name in partition_by_columns {
-                partition_by_exprs.push(col(col_name, &test_schema)?);
-            }
-
-            let mut order_by_exprs = vec![];
-            for col_name in order_by_params {
-                let expr = col(col_name, &test_schema)?;
-                // Give default ordering, this is same with input ordering 
direction
-                // In this test we do check for reversibility.
-                let options = SortOptions::default();
-                order_by_exprs.push(PhysicalSortExpr { expr, options });
-            }
-            let res =
-                can_skip_sort(&partition_by_exprs, &order_by_exprs, 
&exec_unbounded)?;
-            // Since reversibility is not important in this test. Convert 
Option<(bool, PartitionSearchMode)> to Option<PartitionSearchMode>
-            let res = res.map(|(_, mode)| mode);
-            assert_eq!(
-                res, *expected,
-                "Unexpected result for in unbounded test case#: {case_idx:?}, 
case: {test_case:?}"
-            );
-        }
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_can_skip_ordering() -> Result<()> {
-        let test_schema = create_test_schema3()?;
-        // Columns a,c are nullable whereas b,d are not nullable.
-        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST, d ASC NULLS FIRST
-        // Column e is not ordered.
-        let sort_exprs = vec![
-            sort_expr("a", &test_schema),
-            sort_expr("b", &test_schema),
-            sort_expr("c", &test_schema),
-            sort_expr("d", &test_schema),
-        ];
-        let exec_unbounded = csv_exec_sorted(&test_schema, sort_exprs, true);
-
-        // test cases consists of vector of tuples. Where each tuple 
represents a single test case.
-        // First field in the tuple is Vec<str> where each element in the 
vector represents PARTITION BY columns
-        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
-        // Second field in the tuple is Vec<(str, bool, bool)> where each 
element in the vector represents ORDER BY columns
-        // For instance, vec![("c", false, false)], corresponds to ORDER BY c 
ASC NULLS LAST,
-        // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC 
NULLS FIRST,
-        // Third field in the tuple is Option<(bool, PartitionSearchMode)>, 
which corresponds to expected result.
-        // None represents that existing ordering is not sufficient to run 
executor with any one of the algorithms
-        // (We need to add SortExec to be able to run it).
-        // Some((bool, PartitionSearchMode)) represents, we can run algorithm 
with existing ordering. Algorithm should work in
-        // PartitionSearchMode, bool field represents whether we should 
reverse window expressions to run executor with existing ordering.
-        // For instance, `Some((false, PartitionSearchMode::Sorted))`, 
represents that we shouldn't reverse window expressions. And algorithm
-        // should work in Sorted mode to work with existing ordering.
-        let test_cases = vec![
-            // PARTITION BY a, b ORDER BY c ASC NULLS LAST
-            (vec!["a", "b"], vec![("c", false, false)], None),
-            // ORDER BY c ASC NULLS FIRST
-            (vec![], vec![("c", false, true)], None),
-            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
-            (vec!["b"], vec![("c", false, true)], None),
-            // PARTITION BY a, ORDER BY c ASC NULLS FIRST
-            (vec!["a"], vec![("c", false, true)], None),
-            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
-            (
-                vec!["a", "b"],
-                vec![("c", false, true), ("e", false, true)],
-                None,
-            ),
-            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
-            (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
-            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
-            (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
-            // PARTITION BY a, ORDER BY a ASC NULLS LAST
-            (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
-            // PARTITION BY a, ORDER BY a DESC NULLS FIRST
-            (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
-            // PARTITION BY a, ORDER BY a DESC NULLS LAST
-            (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
-            // PARTITION BY a, ORDER BY b ASC NULLS LAST
-            (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
-            // PARTITION BY a, ORDER BY b DESC NULLS LAST
-            (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
-            // PARTITION BY a, b ORDER BY c ASC NULLS FIRST
-            (
-                vec!["a", "b"],
-                vec![("c", false, true)],
-                Some((false, Sorted)),
-            ),
-            // PARTITION BY b, a ORDER BY c ASC NULLS FIRST
-            (
-                vec!["b", "a"],
-                vec![("c", false, true)],
-                Some((false, Sorted)),
-            ),
-            // PARTITION BY a, b ORDER BY c DESC NULLS LAST
-            (
-                vec!["a", "b"],
-                vec![("c", true, false)],
-                Some((true, Sorted)),
-            ),
-            // PARTITION BY e ORDER BY a ASC NULLS FIRST
-            (
-                vec!["e"],
-                vec![("a", false, true)],
-                // For unbounded, expects to work in Linear mode. Shouldn't 
reverse window function.
-                Some((false, Linear)),
-            ),
-            // PARTITION BY b, c ORDER BY a ASC NULLS FIRST, c ASC NULLS FIRST
-            (
-                vec!["b", "c"],
-                vec![("a", false, true), ("c", false, true)],
-                Some((false, Linear)),
-            ),
-            // PARTITION BY b ORDER BY a ASC NULLS FIRST
-            (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
-            // PARTITION BY a, e ORDER BY b ASC NULLS FIRST
-            (
-                vec!["a", "e"],
-                vec![("b", false, true)],
-                Some((false, PartiallySorted(vec![0]))),
-            ),
-            // PARTITION BY a, c ORDER BY b ASC NULLS FIRST
-            (
-                vec!["a", "c"],
-                vec![("b", false, true)],
-                Some((false, PartiallySorted(vec![0]))),
-            ),
-            // PARTITION BY c, a ORDER BY b ASC NULLS FIRST
-            (
-                vec!["c", "a"],
-                vec![("b", false, true)],
-                Some((false, PartiallySorted(vec![1]))),
-            ),
-            // PARTITION BY d, b, a ORDER BY c ASC NULLS FIRST
-            (
-                vec!["d", "b", "a"],
-                vec![("c", false, true)],
-                Some((false, PartiallySorted(vec![2, 1]))),
-            ),
-            // PARTITION BY e, b, a ORDER BY c ASC NULLS FIRST
-            (
-                vec!["e", "b", "a"],
-                vec![("c", false, true)],
-                Some((false, PartiallySorted(vec![2, 1]))),
-            ),
-            // PARTITION BY d, a ORDER BY b ASC NULLS FIRST
-            (
-                vec!["d", "a"],
-                vec![("b", false, true)],
-                Some((false, PartiallySorted(vec![1]))),
-            ),
-            // PARTITION BY b, ORDER BY b, a ASC NULLS FIRST
-            (
-                vec!["a"],
-                vec![("b", false, true), ("a", false, true)],
-                Some((false, Sorted)),
-            ),
-            // ORDER BY b, a ASC NULLS FIRST
-            (vec![], vec![("b", false, true), ("a", false, true)], None),
-        ];
-        for (case_idx, test_case) in test_cases.iter().enumerate() {
-            let (partition_by_columns, order_by_params, expected) = &test_case;
-            let mut partition_by_exprs = vec![];
-            for col_name in partition_by_columns {
-                partition_by_exprs.push(col(col_name, &test_schema)?);
-            }
-
-            let mut order_by_exprs = vec![];
-            for (col_name, descending, nulls_first) in order_by_params {
-                let expr = col(col_name, &test_schema)?;
-                let options = SortOptions {
-                    descending: *descending,
-                    nulls_first: *nulls_first,
-                };
-                order_by_exprs.push(PhysicalSortExpr { expr, options });
-            }
-
-            assert_eq!(
-                can_skip_sort(&partition_by_exprs, &order_by_exprs, 
&exec_unbounded)?,
-                *expected,
-                "Unexpected result for in unbounded test case#: {case_idx:?}, 
case: {test_case:?}"
-            );
-        }
-
-        Ok(())
-    }
-
     /// Runs the sort enforcement optimizer and asserts the plan
     /// against the original and expected plans
     ///
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index e223e43cb3..b0ae199a2d 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -19,10 +19,10 @@
 //! order-preserving variants when it is helpful; either in terms of
 //! performance or to accommodate unbounded streams by fixing the pipeline.
 
+use std::sync::Arc;
+
 use crate::error::Result;
-use crate::physical_optimizer::utils::{
-    is_coalesce_partitions, is_sort, unbounded_output, ExecTree,
-};
+use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort, 
ExecTree};
 use crate::physical_plan::repartition::RepartitionExec;
 use 
crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
@@ -32,8 +32,7 @@ use super::utils::is_repartition;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_physical_expr::utils::ordering_satisfy;
-
-use std::sync::Arc;
+use datafusion_physical_plan::unbounded_output;
 
 /// For a given `plan`, this object carries the information one needs from its
 /// descendants to decide whether it is beneficial to replace order-losing (but
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index b4dd75e586..21c976e07a 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -17,9 +17,6 @@
 
 //! Collection of utility functions that are leveraged by the query optimizer 
rules
 
-use itertools::concat;
-use std::borrow::Borrow;
-use std::collections::HashSet;
 use std::fmt;
 use std::fmt::Formatter;
 use std::sync::Arc;
@@ -34,7 +31,6 @@ use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{displayable, ExecutionPlan};
 
-use datafusion_common::DataFusionError;
 use datafusion_physical_expr::utils::ordering_satisfy;
 use datafusion_physical_expr::PhysicalSortExpr;
 
@@ -75,30 +71,6 @@ impl ExecTree {
             children,
         }
     }
-
-    /// This function returns the executors at the leaves of the tree.
-    pub fn get_leaves(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        if self.children.is_empty() {
-            vec![self.plan.clone()]
-        } else {
-            concat(self.children.iter().map(|e| e.get_leaves()))
-        }
-    }
-}
-
-// Get output (un)boundedness information for the given `plan`.
-pub(crate) fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
-    let result = if plan.children().is_empty() {
-        plan.unbounded_output(&[])
-    } else {
-        let children_unbounded_output = plan
-            .children()
-            .iter()
-            .map(unbounded_output)
-            .collect::<Vec<_>>();
-        plan.unbounded_output(&children_unbounded_output)
-    };
-    result.unwrap_or(true)
 }
 
 /// This utility function adds a `SortExec` above an operator according to the
@@ -126,64 +98,6 @@ pub fn add_sort_above(
     Ok(())
 }
 
-/// Find indices of each element in `targets` inside `items`. If one of the
-/// elements is absent in `items`, returns an error.
-pub fn find_indices<T: PartialEq, S: Borrow<T>>(
-    items: &[T],
-    targets: impl IntoIterator<Item = S>,
-) -> Result<Vec<usize>> {
-    targets
-        .into_iter()
-        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
-        .collect::<Option<_>>()
-        .ok_or_else(|| DataFusionError::Execution("Target not 
found".to_string()))
-}
-
-/// Merges collections `first` and `second`, removes duplicates and sorts the
-/// result, returning it as a [`Vec`].
-pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
-    first: impl IntoIterator<Item = T>,
-    second: impl IntoIterator<Item = S>,
-) -> Vec<usize> {
-    let mut result: Vec<_> = first
-        .into_iter()
-        .map(|e| *e.borrow())
-        .chain(second.into_iter().map(|e| *e.borrow()))
-        .collect::<HashSet<_>>()
-        .into_iter()
-        .collect();
-    result.sort();
-    result
-}
-
-/// Checks whether the given index sequence is monotonically non-decreasing.
-pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) -> 
bool {
-    // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
-    let mut previous = 0;
-    for item in sequence.into_iter() {
-        let current = *item.borrow();
-        if current < previous {
-            return false;
-        }
-        previous = current;
-    }
-    true
-}
-
-/// Calculates the set difference between sequences `first` and `second`,
-/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
-pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
-    first: impl IntoIterator<Item = T>,
-    second: impl IntoIterator<Item = S>,
-) -> Vec<usize> {
-    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
-    first
-        .into_iter()
-        .map(|e| *e.borrow())
-        .filter(|e| !set.contains(e))
-        .collect()
-}
-
 /// Checks whether the given operator is a limit;
 /// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
 pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
@@ -227,53 +141,3 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> 
Vec<String> {
     let actual: Vec<&str> = formatted.trim().lines().collect();
     actual.iter().map(|elem| elem.to_string()).collect()
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[tokio::test]
-    async fn test_find_indices() -> Result<()> {
-        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
-        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
-        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
-        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
-        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_merge_and_order_indices() {
-        assert_eq!(
-            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
-            vec![0, 1, 3, 4, 5]
-        );
-        // Result should be ordered, even if inputs are not
-        assert_eq!(
-            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
-            vec![0, 1, 3, 4, 5]
-        );
-    }
-
-    #[tokio::test]
-    async fn test_is_sorted() {
-        assert!(is_sorted::<usize>([]));
-        assert!(is_sorted([0]));
-        assert!(is_sorted([0, 3, 4]));
-        assert!(is_sorted([0, 1, 2]));
-        assert!(is_sorted([0, 1, 4]));
-        assert!(is_sorted([0usize; 0]));
-        assert!(is_sorted([1, 2]));
-        assert!(!is_sorted([3, 2]));
-    }
-
-    #[tokio::test]
-    async fn test_set_difference() {
-        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
-        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
-        // return value should have same ordering with the in1
-        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
-        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
-        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
-    }
-}
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index c397fb5d3e..76adf7611d 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -330,6 +330,17 @@ pub fn execute_stream_partitioned(
     Ok(streams)
 }
 
+// Get output (un)boundedness information for the given `plan`.
+pub fn unbounded_output(plan: &Arc<dyn ExecutionPlan>) -> bool {
+    let children_unbounded_output = plan
+        .children()
+        .iter()
+        .map(unbounded_output)
+        .collect::<Vec<_>>();
+    plan.unbounded_output(&children_unbounded_output)
+        .unwrap_or(true)
+}
+
 use datafusion_physical_expr::expressions::Column;
 pub use datafusion_physical_expr::window::WindowExpr;
 pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c6211c8061..4108b42205 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -20,6 +20,13 @@
 //! the input data seen so far), which makes it appropriate when processing
 //! infinite inputs.
 
+use std::any::Any;
+use std::cmp::{min, Ordering};
+use std::collections::{HashMap, VecDeque};
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
 use crate::expressions::PhysicalSortExpr;
 use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
 use crate::windows::{
@@ -29,35 +36,20 @@ use crate::{
     ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, 
ExecutionPlan,
     Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics, 
WindowExpr,
 };
-use datafusion_common::{exec_err, plan_err, Result};
-use datafusion_execution::TaskContext;
 
-use ahash::RandomState;
 use arrow::{
     array::{Array, ArrayRef, UInt32Builder},
     compute::{concat, concat_batches, sort_to_indices},
     datatypes::{Schema, SchemaBuilder, SchemaRef},
     record_batch::RecordBatch,
 };
-use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
-use futures::stream::Stream;
-use futures::{ready, StreamExt};
-use hashbrown::raw::RawTable;
-use indexmap::IndexMap;
-use log::debug;
-
-use std::any::Any;
-use std::cmp::{min, Ordering};
-use std::collections::{HashMap, VecDeque};
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll};
-
 use datafusion_common::utils::{
     evaluate_partition_ranges, get_arrayref_at_indices, get_at_indices,
     get_record_batch_at_indices, get_row_at_idx,
 };
-use datafusion_common::DataFusionError;
+use datafusion_common::{exec_err, plan_err, DataFusionError, Result};
+use datafusion_execution::TaskContext;
+use datafusion_expr::window_state::{PartitionBatchState, WindowAggState};
 use datafusion_expr::ColumnarValue;
 use datafusion_physical_expr::hash_utils::create_hashes;
 use datafusion_physical_expr::window::{
@@ -68,6 +60,13 @@ use datafusion_physical_expr::{
     PhysicalSortRequirement,
 };
 
+use ahash::RandomState;
+use futures::stream::Stream;
+use futures::{ready, StreamExt};
+use hashbrown::raw::RawTable;
+use indexmap::IndexMap;
+use log::debug;
+
 #[derive(Debug, Clone, PartialEq)]
 /// Specifies partition column properties in terms of input ordering
 pub enum PartitionSearchMode {
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 2a2f8d6d21..0f165f7935 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -17,43 +17,49 @@
 
 //! Physical expressions for window functions
 
+use std::borrow::Borrow;
+use std::convert::TryInto;
+use std::sync::Arc;
+
 use crate::{
     aggregates,
     expressions::{
         cume_dist, dense_rank, lag, lead, percent_rank, rank, Literal, 
NthValue, Ntile,
         PhysicalSortExpr, RowNumber,
     },
-    udaf, ExecutionPlan, PhysicalExpr,
+    udaf, unbounded_output, ExecutionPlan, PhysicalExpr,
 };
+
 use arrow::datatypes::Schema;
 use arrow_schema::{DataType, Field, SchemaRef};
-use datafusion_common::ScalarValue;
-use datafusion_common::{DataFusionError, Result};
+use datafusion_common::utils::{
+    find_indices, get_at_indices, is_sorted, longest_consecutive_prefix,
+    merge_and_order_indices, set_difference,
+};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
 use datafusion_expr::{
     window_function::{BuiltInWindowFunction, WindowFunction},
     PartitionEvaluator, WindowFrame, WindowUDF,
 };
 use datafusion_physical_expr::{
+    equivalence::OrderingEquivalenceBuilder,
+    utils::{convert_to_expr, get_indices_of_matching_exprs},
     window::{BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr},
-    AggregateExpr,
+    AggregateExpr, OrderingEquivalenceProperties, PhysicalSortRequirement,
 };
-use std::borrow::Borrow;
-use std::convert::TryInto;
-use std::sync::Arc;
+
+use itertools::{izip, Itertools};
 
 mod bounded_window_agg_exec;
 mod window_agg_exec;
 
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
 pub use bounded_window_agg_exec::PartitionSearchMode;
-use datafusion_common::utils::longest_consecutive_prefix;
-use datafusion_physical_expr::equivalence::OrderingEquivalenceBuilder;
-use datafusion_physical_expr::utils::{convert_to_expr, 
get_indices_of_matching_exprs};
+pub use window_agg_exec::WindowAggExec;
+
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
-use datafusion_physical_expr::{OrderingEquivalenceProperties, 
PhysicalSortRequirement};
-pub use window_agg_exec::WindowAggExec;
 
 /// Create a physical expression for window function
 pub fn create_window_expr(
@@ -355,19 +361,266 @@ pub(crate) fn window_ordering_equivalence(
     }
     builder.build()
 }
+
+/// Constructs the best-fitting windowing operator (a `WindowAggExec` or a
+/// `BoundedWindowExec`) for the given `input` according to the specifications
+/// of `window_exprs` and `physical_partition_keys`. Here, best-fitting means
+/// not requiring additional sorting and/or partitioning for the given input.
+/// - A return value of `None` represents that there is no way to construct a
+///   windowing operator that doesn't need additional sorting/partitioning for
+///   the given input. Existing ordering should be changed to run the given
+///   windowing operation.
+/// - A `Some(window exec)` value contains the optimal windowing operator (a
+///   `WindowAggExec` or a `BoundedWindowExec`) for the given input.
+pub fn get_best_fitting_window(
+    window_exprs: &[Arc<dyn WindowExpr>],
+    input: &Arc<dyn ExecutionPlan>,
+    // These are the partition keys used during repartitioning.
+    // They are either the same with `window_expr`'s PARTITION BY columns,
+    // or it is empty if partitioning is not desirable for this windowing 
operator.
+    physical_partition_keys: &[Arc<dyn PhysicalExpr>],
+) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+    // Contains at least one window expr and all of the partition by and order 
by sections
+    // of the window_exprs are same.
+    let partitionby_exprs = window_exprs[0].partition_by();
+    let orderby_keys = window_exprs[0].order_by();
+    let (should_reverse, partition_search_mode) =
+        if let Some((should_reverse, partition_search_mode)) =
+            can_skip_sort(partitionby_exprs, orderby_keys, input)?
+        {
+            (should_reverse, partition_search_mode)
+        } else {
+            return Ok(None);
+        };
+    let is_unbounded = unbounded_output(input);
+    if !is_unbounded && partition_search_mode != PartitionSearchMode::Sorted {
+        // Executor has bounded input and `partition_search_mode` is not 
`PartitionSearchMode::Sorted`
+        // in this case removing the sort is not helpful, return:
+        return Ok(None);
+    };
+
+    let window_expr = if should_reverse {
+        if let Some(reversed_window_expr) = window_exprs
+            .iter()
+            .map(|e| e.get_reverse_expr())
+            .collect::<Option<Vec<_>>>()
+        {
+            reversed_window_expr
+        } else {
+            // Cannot take reverse of any of the window expr
+            // In this case, with existing ordering window cannot be run
+            return Ok(None);
+        }
+    } else {
+        window_exprs.to_vec()
+    };
+
+    // If all window expressions can run with bounded memory, choose the
+    // bounded window variant:
+    if window_expr.iter().all(|e| e.uses_bounded_memory()) {
+        Ok(Some(Arc::new(BoundedWindowAggExec::try_new(
+            window_expr,
+            input.clone(),
+            input.schema(),
+            physical_partition_keys.to_vec(),
+            partition_search_mode,
+        )?) as _))
+    } else if partition_search_mode != PartitionSearchMode::Sorted {
+        // For `WindowAggExec` to work correctly PARTITION BY columns should 
be sorted.
+        // Hence, if `partition_search_mode` is not 
`PartitionSearchMode::Sorted` we should convert
+        // input ordering such that it can work with 
PartitionSearchMode::Sorted (add `SortExec`).
+        // Effectively `WindowAggExec` works only in 
PartitionSearchMode::Sorted mode.
+        Ok(None)
+    } else {
+        Ok(Some(Arc::new(WindowAggExec::try_new(
+            window_expr,
+            input.clone(),
+            input.schema(),
+            physical_partition_keys.to_vec(),
+        )?) as _))
+    }
+}
+
+/// Compares physical ordering (output ordering of the `input` operator) with
+/// `partitionby_exprs` and `orderby_keys` to decide whether existing ordering
+/// is sufficient to run the current window operator.
+/// - A `None` return value indicates that we can not remove the sort in 
question
+///   (input ordering is not sufficient to run current window executor).
+/// - A `Some((bool, PartitionSearchMode))` value indicates that the window 
operator
+///   can run with existing input ordering, so we can remove `SortExec` before 
it.
+/// The `bool` field in the return value represents whether we should reverse 
window
+/// operator to remove `SortExec` before it. The `PartitionSearchMode` field 
represents
+/// the mode this window operator should work in to accomodate the existing 
ordering.
+fn can_skip_sort(
+    partitionby_exprs: &[Arc<dyn PhysicalExpr>],
+    orderby_keys: &[PhysicalSortExpr],
+    input: &Arc<dyn ExecutionPlan>,
+) -> Result<Option<(bool, PartitionSearchMode)>> {
+    let physical_ordering = if let Some(physical_ordering) = 
input.output_ordering() {
+        physical_ordering
+    } else {
+        // If there is no physical ordering, there is no way to remove a
+        // sort, so immediately return.
+        return Ok(None);
+    };
+    let orderby_exprs = convert_to_expr(orderby_keys);
+    let physical_ordering_exprs = convert_to_expr(physical_ordering);
+    let equal_properties = || input.equivalence_properties();
+    // Get the indices of the ORDER BY expressions among input ordering 
expressions:
+    let ob_indices = get_indices_of_matching_exprs(
+        &orderby_exprs,
+        &physical_ordering_exprs,
+        equal_properties,
+    );
+    if ob_indices.len() != orderby_exprs.len() {
+        // If all order by expressions are not in the input ordering,
+        // there is no way to remove a sort -- immediately return:
+        return Ok(None);
+    }
+    // Get the indices of the PARTITION BY expressions among input ordering 
expressions:
+    let pb_indices = get_indices_of_matching_exprs(
+        partitionby_exprs,
+        &physical_ordering_exprs,
+        equal_properties,
+    );
+    let ordered_merged_indices = merge_and_order_indices(&pb_indices, 
&ob_indices);
+    // Get the indices of the ORDER BY columns that don't appear in the
+    // PARTITION BY clause; i.e. calculate (ORDER BY columns) ∖ (PARTITION
+    // BY columns) where `∖` represents set difference.
+    let unique_ob_indices = set_difference(&ob_indices, &pb_indices);
+    if !is_sorted(&unique_ob_indices) {
+        // ORDER BY indices should be ascending ordered
+        return Ok(None);
+    }
+    let first_n = longest_consecutive_prefix(ordered_merged_indices);
+    let furthest_ob_index = *unique_ob_indices.last().unwrap_or(&0);
+    // Cannot skip sort if last order by index is not within consecutive 
prefix.
+    // For instance, if input is ordered by a, b, c, d for the expression
+    // `PARTITION BY a, ORDER BY b, d`, then `first_n` would be 2 (meaning a, 
b defines a
+    // prefix for input ordering). However, `furthest_ob_index` would be 3 as 
column d
+    // occurs at the 3rd index of the existing ordering. Hence, existing 
ordering would
+    // not be sufficient to run the current operator.
+    // However, for expression `PARTITION BY a, ORDER BY b, c, d`, `first_n` 
would be 4 (meaning
+    // a, b, c, d defines a prefix for input ordering). Similarly, 
`furthest_ob_index` would be
+    // 3 as column d occurs at the 3rd index of the existing ordering. 
Therefore, the existing
+    // ordering would be sufficient to run the current operator.
+    if first_n <= furthest_ob_index {
+        return Ok(None);
+    }
+    let input_orderby_columns = get_at_indices(physical_ordering, 
&unique_ob_indices)?;
+    let expected_orderby_columns =
+        get_at_indices(orderby_keys, find_indices(&ob_indices, 
&unique_ob_indices)?)?;
+    let should_reverse = if let Some(should_reverse) = check_alignments(
+        &input.schema(),
+        &input_orderby_columns,
+        &expected_orderby_columns,
+    )? {
+        should_reverse
+    } else {
+        // If ordering directions are not aligned, we cannot calculate the
+        // result without changing existing ordering.
+        return Ok(None);
+    };
+
+    let ordered_pb_indices = 
pb_indices.iter().copied().sorted().collect::<Vec<_>>();
+    // Determine how many elements in the PARTITION BY columns defines a 
consecutive range from zero.
+    let first_n = longest_consecutive_prefix(&ordered_pb_indices);
+    let mode = if first_n == partitionby_exprs.len() {
+        // All of the PARTITION BY columns defines a consecutive range from 
zero.
+        PartitionSearchMode::Sorted
+    } else if first_n > 0 {
+        // All of the PARTITION BY columns defines a consecutive range from 
zero.
+        let ordered_range = &ordered_pb_indices[0..first_n];
+        let input_pb_exprs = get_at_indices(&physical_ordering_exprs, 
ordered_range)?;
+        let partially_ordered_indices = get_indices_of_matching_exprs(
+            &input_pb_exprs,
+            partitionby_exprs,
+            equal_properties,
+        );
+        PartitionSearchMode::PartiallySorted(partially_ordered_indices)
+    } else {
+        // None of the PARTITION BY columns defines a consecutive range from 
zero.
+        PartitionSearchMode::Linear
+    };
+
+    Ok(Some((should_reverse, mode)))
+}
+
+/// Compares all the orderings in `physical_ordering` and `required`, decides
+/// whether alignments match. A `None` return value indicates that current
+/// column is not aligned. A `Some(bool)` value indicates otherwise, and 
signals
+/// whether we should reverse the window expression in order to avoid sorting.
+fn check_alignments(
+    schema: &SchemaRef,
+    physical_ordering: &[PhysicalSortExpr],
+    required: &[PhysicalSortExpr],
+) -> Result<Option<bool>> {
+    let result = izip!(physical_ordering, required)
+        .map(|(lhs, rhs)| check_alignment(schema, lhs, rhs))
+        .collect::<Result<Option<Vec<_>>>>()?;
+    Ok(if let Some(res) = result {
+        if !res.is_empty() {
+            let first = res[0];
+            let all_same = res.into_iter().all(|elem| elem == first);
+            all_same.then_some(first)
+        } else {
+            Some(false)
+        }
+    } else {
+        // Cannot skip some of the requirements in the input.
+        None
+    })
+}
+
+/// Compares `physical_ordering` and `required` ordering, decides whether
+/// alignments match. A `None` return value indicates that current column is
+/// not aligned. A `Some(bool)` value indicates otherwise, and signals whether
+/// we should reverse the window expression in order to avoid sorting.
+fn check_alignment(
+    input_schema: &SchemaRef,
+    physical_ordering: &PhysicalSortExpr,
+    required: &PhysicalSortExpr,
+) -> Result<Option<bool>> {
+    Ok(if required.expr.eq(&physical_ordering.expr) {
+        let physical_opts = physical_ordering.options;
+        let required_opts = required.options;
+        if required.expr.nullable(input_schema)? {
+            let reverse = physical_opts == !required_opts;
+            (reverse || physical_opts == required_opts).then_some(reverse)
+        } else {
+            // If the column is not nullable, NULLS FIRST/LAST is not 
important.
+            Some(physical_opts.descending != required_opts.descending)
+        }
+    } else {
+        None
+    })
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::aggregates::AggregateFunction;
     use crate::collect;
     use crate::expressions::col;
+    use crate::streaming::StreamingTableExec;
     use crate::test::assert_is_pending;
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
+    use crate::windows::PartitionSearchMode::{Linear, PartiallySorted, Sorted};
+
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, SchemaRef};
     use datafusion_execution::TaskContext;
+
     use futures::FutureExt;
 
+    fn create_test_schema() -> Result<SchemaRef> {
+        let nullable_column = Field::new("nullable_col", DataType::Int32, 
true);
+        let non_nullable_column = Field::new("non_nullable_col", 
DataType::Int32, false);
+        let schema = Arc::new(Schema::new(vec![nullable_column, 
non_nullable_column]));
+
+        Ok(schema)
+    }
+
     fn create_test_schema2() -> Result<SchemaRef> {
         let a = Field::new("a", DataType::Int32, true);
         let b = Field::new("b", DataType::Int32, true);
@@ -378,6 +631,51 @@ mod tests {
         Ok(schema)
     }
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema3() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, false);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, false);
+        let e = Field::new("e", DataType::Int32, false);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+        Ok(schema)
+    }
+
+    /// make PhysicalSortExpr with default options
+    pub fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        sort_expr_options(name, schema, SortOptions::default())
+    }
+
+    /// PhysicalSortExpr with specified options
+    pub fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    /// Created a sorted Streaming Table exec
+    pub fn streaming_table_exec(
+        schema: &SchemaRef,
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        infinite_source: bool,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+
+        Ok(Arc::new(StreamingTableExec::try_new(
+            schema.clone(),
+            vec![],
+            None,
+            Some(sort_exprs),
+            infinite_source,
+        )?))
+    }
+
     #[tokio::test]
     async fn test_calc_requirements() -> Result<()> {
         let schema = create_test_schema2()?;
@@ -474,4 +772,362 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_is_column_aligned_nullable() -> Result<()> {
+        let schema = create_test_schema()?;
+        let params = vec![
+            ((true, true), (false, false), Some(true)),
+            ((true, true), (false, true), None),
+            ((true, true), (true, false), None),
+            ((true, false), (false, true), Some(true)),
+            ((true, false), (false, false), None),
+            ((true, false), (true, true), None),
+        ];
+        for (
+            (physical_desc, physical_nulls_first),
+            (req_desc, req_nulls_first),
+            expected,
+        ) in params
+        {
+            let physical_ordering = PhysicalSortExpr {
+                expr: col("nullable_col", &schema)?,
+                options: SortOptions {
+                    descending: physical_desc,
+                    nulls_first: physical_nulls_first,
+                },
+            };
+            let required_ordering = PhysicalSortExpr {
+                expr: col("nullable_col", &schema)?,
+                options: SortOptions {
+                    descending: req_desc,
+                    nulls_first: req_nulls_first,
+                },
+            };
+            let res = check_alignment(&schema, &physical_ordering, 
&required_ordering)?;
+            assert_eq!(res, expected);
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_is_column_aligned_non_nullable() -> Result<()> {
+        let schema = create_test_schema()?;
+
+        let params = vec![
+            ((true, true), (false, false), Some(true)),
+            ((true, true), (false, true), Some(true)),
+            ((true, true), (true, false), Some(false)),
+            ((true, false), (false, true), Some(true)),
+            ((true, false), (false, false), Some(true)),
+            ((true, false), (true, true), Some(false)),
+        ];
+        for (
+            (physical_desc, physical_nulls_first),
+            (req_desc, req_nulls_first),
+            expected,
+        ) in params
+        {
+            let physical_ordering = PhysicalSortExpr {
+                expr: col("non_nullable_col", &schema)?,
+                options: SortOptions {
+                    descending: physical_desc,
+                    nulls_first: physical_nulls_first,
+                },
+            };
+            let required_ordering = PhysicalSortExpr {
+                expr: col("non_nullable_col", &schema)?,
+                options: SortOptions {
+                    descending: req_desc,
+                    nulls_first: req_nulls_first,
+                },
+            };
+            let res = check_alignment(&schema, &physical_ordering, 
&required_ordering)?;
+            assert_eq!(res, expected);
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_can_skip_ordering_exhaustive() -> Result<()> {
+        let test_schema = create_test_schema3()?;
+        // Columns a,c are nullable whereas b,d are not nullable.
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST, d ASC NULLS FIRST
+        // Column e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+            sort_expr("d", &test_schema),
+        ];
+        let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, 
true)?;
+
+        // test cases consists of vector of tuples. Where each tuple 
represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the 
vector represents PARTITION BY columns
+        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
+        // Second field in the tuple is Vec<str> where each element in the 
vector represents ORDER BY columns
+        // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST, 
(ordering is default ordering. We do not check
+        // for reversibility in this test).
+        // Third field in the tuple is Option<PartitionSearchMode>, which 
corresponds to expected algorithm mode.
+        // None represents that existing ordering is not sufficient to run 
executor with any one of the algorithms
+        // (We need to add SortExec to be able to run it).
+        // Some(PartitionSearchMode) represents, we can run algorithm with 
existing ordering; and algorithm should work in
+        // PartitionSearchMode.
+        let test_cases = vec![
+            (vec!["a"], vec!["a"], Some(Sorted)),
+            (vec!["a"], vec!["b"], Some(Sorted)),
+            (vec!["a"], vec!["c"], None),
+            (vec!["a"], vec!["a", "b"], Some(Sorted)),
+            (vec!["a"], vec!["b", "c"], Some(Sorted)),
+            (vec!["a"], vec!["a", "c"], None),
+            (vec!["a"], vec!["a", "b", "c"], Some(Sorted)),
+            (vec!["b"], vec!["a"], Some(Linear)),
+            (vec!["b"], vec!["b"], None),
+            (vec!["b"], vec!["c"], None),
+            (vec!["b"], vec!["a", "b"], Some(Linear)),
+            (vec!["b"], vec!["b", "c"], None),
+            (vec!["b"], vec!["a", "c"], Some(Linear)),
+            (vec!["b"], vec!["a", "b", "c"], Some(Linear)),
+            (vec!["c"], vec!["a"], Some(Linear)),
+            (vec!["c"], vec!["b"], None),
+            (vec!["c"], vec!["c"], None),
+            (vec!["c"], vec!["a", "b"], Some(Linear)),
+            (vec!["c"], vec!["b", "c"], None),
+            (vec!["c"], vec!["a", "c"], Some(Linear)),
+            (vec!["c"], vec!["a", "b", "c"], Some(Linear)),
+            (vec!["b", "a"], vec!["a"], Some(Sorted)),
+            (vec!["b", "a"], vec!["b"], Some(Sorted)),
+            (vec!["b", "a"], vec!["c"], Some(Sorted)),
+            (vec!["b", "a"], vec!["a", "b"], Some(Sorted)),
+            (vec!["b", "a"], vec!["b", "c"], Some(Sorted)),
+            (vec!["b", "a"], vec!["a", "c"], Some(Sorted)),
+            (vec!["b", "a"], vec!["a", "b", "c"], Some(Sorted)),
+            (vec!["c", "b"], vec!["a"], Some(Linear)),
+            (vec!["c", "b"], vec!["b"], None),
+            (vec!["c", "b"], vec!["c"], None),
+            (vec!["c", "b"], vec!["a", "b"], Some(Linear)),
+            (vec!["c", "b"], vec!["b", "c"], None),
+            (vec!["c", "b"], vec!["a", "c"], Some(Linear)),
+            (vec!["c", "b"], vec!["a", "b", "c"], Some(Linear)),
+            (vec!["c", "a"], vec!["a"], Some(PartiallySorted(vec![1]))),
+            (vec!["c", "a"], vec!["b"], Some(PartiallySorted(vec![1]))),
+            (vec!["c", "a"], vec!["c"], Some(PartiallySorted(vec![1]))),
+            (
+                vec!["c", "a"],
+                vec!["a", "b"],
+                Some(PartiallySorted(vec![1])),
+            ),
+            (
+                vec!["c", "a"],
+                vec!["b", "c"],
+                Some(PartiallySorted(vec![1])),
+            ),
+            (
+                vec!["c", "a"],
+                vec!["a", "c"],
+                Some(PartiallySorted(vec![1])),
+            ),
+            (
+                vec!["c", "a"],
+                vec!["a", "b", "c"],
+                Some(PartiallySorted(vec![1])),
+            ),
+            (vec!["c", "b", "a"], vec!["a"], Some(Sorted)),
+            (vec!["c", "b", "a"], vec!["b"], Some(Sorted)),
+            (vec!["c", "b", "a"], vec!["c"], Some(Sorted)),
+            (vec!["c", "b", "a"], vec!["a", "b"], Some(Sorted)),
+            (vec!["c", "b", "a"], vec!["b", "c"], Some(Sorted)),
+            (vec!["c", "b", "a"], vec!["a", "c"], Some(Sorted)),
+            (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(Sorted)),
+        ];
+        for (case_idx, test_case) in test_cases.iter().enumerate() {
+            let (partition_by_columns, order_by_params, expected) = &test_case;
+            let mut partition_by_exprs = vec![];
+            for col_name in partition_by_columns {
+                partition_by_exprs.push(col(col_name, &test_schema)?);
+            }
+
+            let mut order_by_exprs = vec![];
+            for col_name in order_by_params {
+                let expr = col(col_name, &test_schema)?;
+                // Give default ordering, this is same with input ordering 
direction
+                // In this test we do check for reversibility.
+                let options = SortOptions::default();
+                order_by_exprs.push(PhysicalSortExpr { expr, options });
+            }
+            let res =
+                can_skip_sort(&partition_by_exprs, &order_by_exprs, 
&exec_unbounded)?;
+            // Since reversibility is not important in this test. Convert 
Option<(bool, PartitionSearchMode)> to Option<PartitionSearchMode>
+            let res = res.map(|(_, mode)| mode);
+            assert_eq!(
+                res, *expected,
+                "Unexpected result for in unbounded test case#: {case_idx:?}, 
case: {test_case:?}"
+            );
+        }
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_can_skip_ordering() -> Result<()> {
+        let test_schema = create_test_schema3()?;
+        // Columns a,c are nullable whereas b,d are not nullable.
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST, d ASC NULLS FIRST
+        // Column e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+            sort_expr("d", &test_schema),
+        ];
+        let exec_unbounded = streaming_table_exec(&test_schema, sort_exprs, 
true)?;
+
+        // test cases consists of vector of tuples. Where each tuple 
represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the 
vector represents PARTITION BY columns
+        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
+        // Second field in the tuple is Vec<(str, bool, bool)> where each 
element in the vector represents ORDER BY columns
+        // For instance, vec![("c", false, false)], corresponds to ORDER BY c 
ASC NULLS LAST,
+        // similarly, vec![("c", true, true)], corresponds to ORDER BY c DESC 
NULLS FIRST,
+        // Third field in the tuple is Option<(bool, PartitionSearchMode)>, 
which corresponds to expected result.
+        // None represents that existing ordering is not sufficient to run 
executor with any one of the algorithms
+        // (We need to add SortExec to be able to run it).
+        // Some((bool, PartitionSearchMode)) represents, we can run algorithm 
with existing ordering. Algorithm should work in
+        // PartitionSearchMode, bool field represents whether we should 
reverse window expressions to run executor with existing ordering.
+        // For instance, `Some((false, PartitionSearchMode::Sorted))`, 
represents that we shouldn't reverse window expressions. And algorithm
+        // should work in Sorted mode to work with existing ordering.
+        let test_cases = vec![
+            // PARTITION BY a, b ORDER BY c ASC NULLS LAST
+            (vec!["a", "b"], vec![("c", false, false)], None),
+            // ORDER BY c ASC NULLS FIRST
+            (vec![], vec![("c", false, true)], None),
+            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
+            (vec!["b"], vec![("c", false, true)], None),
+            // PARTITION BY a, ORDER BY c ASC NULLS FIRST
+            (vec!["a"], vec![("c", false, true)], None),
+            // PARTITION BY b, ORDER BY c ASC NULLS FIRST
+            (
+                vec!["a", "b"],
+                vec![("c", false, true), ("e", false, true)],
+                None,
+            ),
+            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
+            (vec!["a"], vec![("b", false, true)], Some((false, Sorted))),
+            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
+            (vec!["a"], vec![("a", false, true)], Some((false, Sorted))),
+            // PARTITION BY a, ORDER BY a ASC NULLS LAST
+            (vec!["a"], vec![("a", false, false)], Some((false, Sorted))),
+            // PARTITION BY a, ORDER BY a DESC NULLS FIRST
+            (vec!["a"], vec![("a", true, true)], Some((false, Sorted))),
+            // PARTITION BY a, ORDER BY a DESC NULLS LAST
+            (vec!["a"], vec![("a", true, false)], Some((false, Sorted))),
+            // PARTITION BY a, ORDER BY b ASC NULLS LAST
+            (vec!["a"], vec![("b", false, false)], Some((false, Sorted))),
+            // PARTITION BY a, ORDER BY b DESC NULLS LAST
+            (vec!["a"], vec![("b", true, false)], Some((true, Sorted))),
+            // PARTITION BY a, b ORDER BY c ASC NULLS FIRST
+            (
+                vec!["a", "b"],
+                vec![("c", false, true)],
+                Some((false, Sorted)),
+            ),
+            // PARTITION BY b, a ORDER BY c ASC NULLS FIRST
+            (
+                vec!["b", "a"],
+                vec![("c", false, true)],
+                Some((false, Sorted)),
+            ),
+            // PARTITION BY a, b ORDER BY c DESC NULLS LAST
+            (
+                vec!["a", "b"],
+                vec![("c", true, false)],
+                Some((true, Sorted)),
+            ),
+            // PARTITION BY e ORDER BY a ASC NULLS FIRST
+            (
+                vec!["e"],
+                vec![("a", false, true)],
+                // For unbounded, expects to work in Linear mode. Shouldn't 
reverse window function.
+                Some((false, Linear)),
+            ),
+            // PARTITION BY b, c ORDER BY a ASC NULLS FIRST, c ASC NULLS FIRST
+            (
+                vec!["b", "c"],
+                vec![("a", false, true), ("c", false, true)],
+                Some((false, Linear)),
+            ),
+            // PARTITION BY b ORDER BY a ASC NULLS FIRST
+            (vec!["b"], vec![("a", false, true)], Some((false, Linear))),
+            // PARTITION BY a, e ORDER BY b ASC NULLS FIRST
+            (
+                vec!["a", "e"],
+                vec![("b", false, true)],
+                Some((false, PartiallySorted(vec![0]))),
+            ),
+            // PARTITION BY a, c ORDER BY b ASC NULLS FIRST
+            (
+                vec!["a", "c"],
+                vec![("b", false, true)],
+                Some((false, PartiallySorted(vec![0]))),
+            ),
+            // PARTITION BY c, a ORDER BY b ASC NULLS FIRST
+            (
+                vec!["c", "a"],
+                vec![("b", false, true)],
+                Some((false, PartiallySorted(vec![1]))),
+            ),
+            // PARTITION BY d, b, a ORDER BY c ASC NULLS FIRST
+            (
+                vec!["d", "b", "a"],
+                vec![("c", false, true)],
+                Some((false, PartiallySorted(vec![2, 1]))),
+            ),
+            // PARTITION BY e, b, a ORDER BY c ASC NULLS FIRST
+            (
+                vec!["e", "b", "a"],
+                vec![("c", false, true)],
+                Some((false, PartiallySorted(vec![2, 1]))),
+            ),
+            // PARTITION BY d, a ORDER BY b ASC NULLS FIRST
+            (
+                vec!["d", "a"],
+                vec![("b", false, true)],
+                Some((false, PartiallySorted(vec![1]))),
+            ),
+            // PARTITION BY b, ORDER BY b, a ASC NULLS FIRST
+            (
+                vec!["a"],
+                vec![("b", false, true), ("a", false, true)],
+                Some((false, Sorted)),
+            ),
+            // ORDER BY b, a ASC NULLS FIRST
+            (vec![], vec![("b", false, true), ("a", false, true)], None),
+        ];
+        for (case_idx, test_case) in test_cases.iter().enumerate() {
+            let (partition_by_columns, order_by_params, expected) = &test_case;
+            let mut partition_by_exprs = vec![];
+            for col_name in partition_by_columns {
+                partition_by_exprs.push(col(col_name, &test_schema)?);
+            }
+
+            let mut order_by_exprs = vec![];
+            for (col_name, descending, nulls_first) in order_by_params {
+                let expr = col(col_name, &test_schema)?;
+                let options = SortOptions {
+                    descending: *descending,
+                    nulls_first: *nulls_first,
+                };
+                order_by_exprs.push(PhysicalSortExpr { expr, options });
+            }
+
+            assert_eq!(
+                can_skip_sort(&partition_by_exprs, &order_by_exprs, 
&exec_unbounded)?,
+                *expected,
+                "Unexpected result for in unbounded test case#: {case_idx:?}, 
case: {test_case:?}"
+            );
+        }
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/window_agg_exec.rs
index da43f127f0..b56a9c194c 100644
--- a/datafusion/physical-plan/src/windows/window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs
@@ -17,6 +17,11 @@
 
 //! Stream and channel implementations for window function expressions.
 
+use std::any::Any;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
 use crate::common::transpose;
 use crate::expressions::PhysicalSortExpr;
 use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
@@ -28,6 +33,7 @@ use crate::{
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics, WindowExpr,
 };
+
 use arrow::compute::{concat, concat_batches};
 use arrow::datatypes::SchemaBuilder;
 use arrow::error::ArrowError;
@@ -37,16 +43,12 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
-use datafusion_common::Result;
-use datafusion_common::{internal_err, plan_err, DataFusionError};
+use datafusion_common::{internal_err, plan_err, DataFusionError, Result};
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{OrderingEquivalenceProperties, 
PhysicalSortRequirement};
+
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
-use std::any::Any;
-use std::pin::Pin;
-use std::sync::Arc;
-use std::task::{Context, Poll};
 
 /// Window execution plan
 #[derive(Debug)]

Reply via email to