This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 741f8f80fd Treat Partition by columns as set for window functions  
(#5951)
741f8f80fd is described below

commit 741f8f80fd802b92049515035aff439247b802a4
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Apr 13 21:02:17 2023 +0300

    Treat Partition by columns as set for window functions  (#5951)
    
    * Change required input ordering to format to not absolutely require 
direction.
    
    * Treat partition by columns as set
    
    * simplifications
    
    * simplifications
    
    * simplifications
    
    * simplifications
    
    * change place of util function, bug fix get_ordered_partition_by_indices
    
    * minor changes
    
    * Simplifications, clone removals
    
    * update comment
    
    * add new test cases
    
    * Address reviews
    
    * address reviews
    
    * resolve clippy errors
    
    ---------
    
    Co-authored-by: Mehmet Ozan Kabak <[email protected]>
---
 datafusion/common/src/utils.rs                     |  55 +++-
 .../src/physical_optimizer/sort_enforcement.rs     | 298 +++++++++++++++------
 datafusion/core/src/physical_optimizer/utils.rs    | 114 +++++++-
 .../windows/bounded_window_agg_exec.rs             |  65 +++--
 datafusion/core/src/physical_plan/windows/mod.rs   | 117 ++++++--
 .../src/physical_plan/windows/window_agg_exec.rs   |  60 +++--
 datafusion/core/src/test/mod.rs                    |  29 +-
 .../core/tests/sqllogictests/test_files/window.slt | 109 +++++++-
 datafusion/physical-expr/src/utils.rs              | 109 ++++++--
 9 files changed, 796 insertions(+), 160 deletions(-)

diff --git a/datafusion/common/src/utils.rs b/datafusion/common/src/utils.rs
index 9b2604d2ee..2f468ff9c3 100644
--- a/datafusion/common/src/utils.rs
+++ b/datafusion/common/src/utils.rs
@@ -27,7 +27,7 @@ use sqlparser::ast::Ident;
 use sqlparser::dialect::GenericDialect;
 use sqlparser::parser::{Parser, ParserError};
 use sqlparser::tokenizer::{Token, TokenWithLocation};
-use std::borrow::Cow;
+use std::borrow::{Borrow, Cow};
 use std::cmp::Ordering;
 use std::ops::Range;
 
@@ -292,10 +292,45 @@ pub(crate) fn parse_identifiers_normalized(s: &str) -> 
Vec<String> {
         .collect::<Vec<_>>()
 }
 
+/// This function "takes" the elements at `indices` from the slice `items`.
+pub fn get_at_indices<T: Clone, I: Borrow<usize>>(
+    items: &[T],
+    indices: impl IntoIterator<Item = I>,
+) -> Result<Vec<T>> {
+    indices
+        .into_iter()
+        .map(|idx| items.get(*idx.borrow()).cloned())
+        .collect::<Option<Vec<T>>>()
+        .ok_or_else(|| {
+            DataFusionError::Execution(
+                "Expects indices to be in the range of searched 
vector".to_string(),
+            )
+        })
+}
+
+/// This function finds the longest prefix of the form 0, 1, 2, ... within the
+/// collection `sequence`. Examples:
+/// - For 0, 1, 2, 4, 5; we would produce 3, meaning 0, 1, 2 is the longest 
satisfying
+/// prefix.
+/// - For 1, 2, 3, 4; we would produce 0, meaning there is no such prefix.
+pub fn longest_consecutive_prefix<T: Borrow<usize>>(
+    sequence: impl IntoIterator<Item = T>,
+) -> usize {
+    let mut count = 0;
+    for item in sequence {
+        if !count.eq(item.borrow()) {
+            break;
+        }
+        count += 1;
+    }
+    count
+}
+
 #[cfg(test)]
 mod tests {
     use arrow::array::Float64Array;
     use arrow_array::Array;
+    use std::ops::Range;
     use std::sync::Arc;
 
     use crate::from_slice::FromSlice;
@@ -633,4 +668,22 @@ mod tests {
         }
         Ok(())
     }
+
+    #[test]
+    fn test_get_at_indices() -> Result<()> {
+        let in_vec = vec![1, 2, 3, 4, 5, 6, 7];
+        assert_eq!(get_at_indices(&in_vec, [0, 2])?, vec![1, 3]);
+        assert_eq!(get_at_indices(&in_vec, [4, 2])?, vec![5, 3]);
+        // 7 is outside the range
+        assert!(get_at_indices(&in_vec, [7]).is_err());
+        Ok(())
+    }
+
+    #[test]
+    fn test_longest_consecutive_prefix() {
+        assert_eq!(longest_consecutive_prefix([0, 3, 4]), 1);
+        assert_eq!(longest_consecutive_prefix([0, 1, 3, 4]), 2);
+        assert_eq!(longest_consecutive_prefix([0, 1, 2, 3, 4]), 5);
+        assert_eq!(longest_consecutive_prefix([1, 2, 3, 4]), 0);
+    }
 }
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 76628fdcb0..71714e65d5 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -37,8 +37,9 @@ use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown};
 use crate::physical_optimizer::utils::{
-    add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort,
-    is_sort_preserving_merge, is_union, is_window,
+    add_sort_above, find_indices, is_coalesce_partitions, is_limit, 
is_repartition,
+    is_sort, is_sort_preserving_merge, is_sorted, is_union, is_window,
+    merge_and_order_indices, set_difference,
 };
 use crate::physical_optimizer::PhysicalOptimizerRule;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -48,13 +49,14 @@ use crate::physical_plan::windows::{BoundedWindowAggExec, 
WindowAggExec};
 use crate::physical_plan::{with_new_children_if_necessary, Distribution, 
ExecutionPlan};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
+use datafusion_common::utils::{get_at_indices, longest_consecutive_prefix};
 use datafusion_common::DataFusionError;
 use datafusion_physical_expr::utils::{
-    ordering_satisfy, ordering_satisfy_requirement_concrete,
+    convert_to_expr, get_indices_of_matching_exprs, ordering_satisfy,
+    ordering_satisfy_requirement_concrete,
 };
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement};
 use itertools::{concat, izip};
-use std::iter::zip;
 use std::sync::Arc;
 
 /// This rule inspects `SortExec`'s in the given physical plan and removes the
@@ -582,40 +584,24 @@ fn analyze_window_sort_removal(
             "Expects to receive either WindowAggExec of 
BoundedWindowAggExec".to_string(),
         ));
     };
-    let n_req = window_exec.required_input_ordering()[0]
-        .as_ref()
-        .map(|elem| elem.len())
-        .unwrap_or(0);
 
     let mut needs_reverse = None;
     for sort_any in sort_tree.get_leaves() {
-        let sort_output_ordering = sort_any.output_ordering();
         // Variable `sort_any` will either be a `SortExec` or a
         // `SortPreservingMergeExec`, and both have a single child.
         // Therefore, we can use the 0th index without loss of generality.
         let sort_input = sort_any.children()[0].clone();
-        let physical_ordering = sort_input.output_ordering();
-        let sort_output_ordering = sort_output_ordering.ok_or_else(|| {
-            DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
-        })?;
-        // It is enough to check whether the first "n_req" elements of the sort
-        // output satisfy window_exec's requirement as it is only "n_req" long.
-        let required_ordering = &sort_output_ordering[0..n_req];
-        if let Some(physical_ordering) = physical_ordering {
-            if let Some(should_reverse) = can_skip_sort(
-                window_expr[0].partition_by(),
-                required_ordering,
-                &sort_input.schema(),
-                physical_ordering,
-            )? {
-                if should_reverse == 
*needs_reverse.get_or_insert(should_reverse) {
-                    continue;
-                }
+        if let Some(should_reverse) = can_skip_sort(
+            window_expr[0].partition_by(),
+            window_expr[0].order_by(),
+            &sort_input,
+        )? {
+            if should_reverse == *needs_reverse.get_or_insert(should_reverse) {
+                continue;
             }
         }
-        // If there is no physical ordering, or we can not skip the sort, or
-        // window reversal requirements are not uniform; then there is no
-        // opportunity for a sort removal -- we immediately return.
+        // We can not skip the sort, or window reversal requirements are not
+        // uniform; then sort removal is not possible -- we immediately return.
         return Ok(None);
     }
     let new_window_expr = if needs_reverse.unwrap() {
@@ -776,67 +762,108 @@ fn get_sort_exprs(sort_any: &Arc<dyn ExecutionPlan>) -> 
Result<&[PhysicalSortExp
     }
 }
 
-#[derive(Debug)]
-/// This structure stores extra column information required to remove 
unnecessary sorts.
-pub struct ColumnInfo {
-    reverse: bool,
-    is_partition: bool,
-}
-
 /// Compares physical ordering and required ordering of all `PhysicalSortExpr`s
 /// to decide whether a `SortExec` before a `WindowAggExec` can be removed.
 /// A `None` return value indicates that we can remove the sort in question.
 /// A `Some(bool)` value indicates otherwise, and signals whether we need to
 /// reverse the ordering in order to remove the sort in question.
-pub fn can_skip_sort(
-    partition_keys: &[Arc<dyn PhysicalExpr>],
-    required: &[PhysicalSortExpr],
-    input_schema: &SchemaRef,
-    physical_ordering: &[PhysicalSortExpr],
+fn can_skip_sort(
+    partitionby_exprs: &[Arc<dyn PhysicalExpr>],
+    orderby_keys: &[PhysicalSortExpr],
+    input: &Arc<dyn ExecutionPlan>,
 ) -> Result<Option<bool>> {
-    if required.len() > physical_ordering.len() {
+    let physical_ordering = if let Some(physical_ordering) = 
input.output_ordering() {
+        physical_ordering
+    } else {
+        // If there is no physical ordering, there is no way to remove a
+        // sort, so immediately return.
+        return Ok(None);
+    };
+    let orderby_exprs = convert_to_expr(orderby_keys);
+    let physical_ordering_exprs = convert_to_expr(physical_ordering);
+    let equal_properties = || input.equivalence_properties();
+    // indices of the order by expressions among input ordering expressions
+    let ob_indices = get_indices_of_matching_exprs(
+        &orderby_exprs,
+        &physical_ordering_exprs,
+        equal_properties,
+    );
+    let contains_all_orderbys = ob_indices.len() == orderby_exprs.len();
+    if !contains_all_orderbys {
+        // If all order by expressions are not in the input ordering. There is 
no way to remove a sort
+        // immediately return
         return Ok(None);
     }
-    let mut col_infos = vec![];
-    for (sort_expr, physical_expr) in zip(required, physical_ordering) {
-        let column = sort_expr.expr.clone();
-        let is_partition = partition_keys.iter().any(|e| e.eq(&column));
-        if let Some(reverse) = check_alignment(input_schema, physical_expr, 
sort_expr)? {
-            col_infos.push(ColumnInfo {
-                reverse,
-                is_partition,
-            });
-        } else {
-            return Ok(None);
-        }
+    // indices of the partition by expressions among input ordering expressions
+    let mut pb_indices = get_indices_of_matching_exprs(
+        partitionby_exprs,
+        &physical_ordering_exprs,
+        equal_properties,
+    );
+    let ordered_merged_indices = merge_and_order_indices(&pb_indices, 
&ob_indices);
+    // Indices of order by columns that doesn't seen in partition by
+    // Equivalently (Order by columns) ∖ (Partition by columns) where `∖` 
represents set difference.
+    let unique_ob_indices = set_difference(&ob_indices, &pb_indices);
+    if !is_sorted(&unique_ob_indices) {
+        // ORDER BY indices should be ascending ordered
+        return Ok(None);
     }
-    let partition_by_sections = col_infos
-        .iter()
-        .filter(|c| c.is_partition)
-        .collect::<Vec<_>>();
-    let can_skip_partition_bys = if partition_by_sections.is_empty() {
-        true
+    let first_n = longest_consecutive_prefix(ordered_merged_indices);
+    let furthest_ob_index = *unique_ob_indices.last().unwrap_or(&0);
+    // Cannot skip sort if last order by index is not within consecutive 
prefix.
+    // For instance, if input is ordered by a,b,c,d
+    // for expression `PARTITION BY a, ORDER BY b, d`, `first_n` would be 2 
(meaning a, b defines a prefix for input ordering)
+    // Whereas `furthest_ob_index` would be 3 (column d occurs at the 3rd 
index of the existing ordering.)
+    // Hence existing ordering is not sufficient to run current Executor.
+    // However, for expression `PARTITION BY a, ORDER BY b, c, d`, `first_n` 
would be 4 (meaning a, b, c, d defines a prefix for input ordering)
+    // Similarly, `furthest_ob_index` would be 3 (column d occurs at the 3rd 
index of the existing ordering.)
+    // Hence existing ordering would be sufficient to run current Executor.
+    if first_n <= furthest_ob_index {
+        return Ok(None);
+    }
+    let input_orderby_columns = get_at_indices(physical_ordering, 
&unique_ob_indices)?;
+    let expected_orderby_columns =
+        get_at_indices(orderby_keys, find_indices(&ob_indices, 
unique_ob_indices)?)?;
+    let should_reverse = if let Some(should_reverse) = check_alignments(
+        &input.schema(),
+        &input_orderby_columns,
+        &expected_orderby_columns,
+    )? {
+        should_reverse
     } else {
-        let first_reverse = partition_by_sections[0].reverse;
-        let can_skip_partition_bys = partition_by_sections
-            .iter()
-            .all(|c| c.reverse == first_reverse);
-        can_skip_partition_bys
+        // If ordering directions are not aligned. We cannot calculate result 
without changing existing ordering.
+        return Ok(None);
     };
-    let order_by_sections = col_infos
-        .iter()
-        .filter(|elem| !elem.is_partition)
-        .collect::<Vec<_>>();
-    let (can_skip_order_bys, should_reverse_order_bys) = if 
order_by_sections.is_empty() {
-        (true, false)
+
+    // Determine how many elements in the partition by columns defines a 
consecutive range from zero.
+    pb_indices.sort();
+    let first_n = longest_consecutive_prefix(pb_indices);
+    if first_n < partitionby_exprs.len() {
+        return Ok(None);
+    }
+    Ok(Some(should_reverse))
+}
+
+fn check_alignments(
+    schema: &SchemaRef,
+    physical_ordering: &[PhysicalSortExpr],
+    required: &[PhysicalSortExpr],
+) -> Result<Option<bool>> {
+    let res = izip!(physical_ordering, required)
+        .map(|(lhs, rhs)| check_alignment(schema, lhs, rhs))
+        .collect::<Result<Option<Vec<_>>>>()?;
+    Ok(if let Some(res) = res {
+        if !res.is_empty() {
+            let first = res[0];
+            let all_same = res.into_iter().all(|elem| elem == first);
+            all_same.then_some(first)
+        } else {
+            Some(false)
+        }
     } else {
-        let first_reverse = order_by_sections[0].reverse;
-        let can_skip_order_bys =
-            order_by_sections.iter().all(|c| c.reverse == first_reverse);
-        (can_skip_order_bys, first_reverse)
-    };
-    let can_skip = can_skip_order_bys && can_skip_partition_bys;
-    Ok(can_skip.then_some(should_reverse_order_bys))
+        // Cannot skip some of the requirements in the input.
+        None
+    })
 }
 
 /// Compares `physical_ordering` and `required` ordering, decides whether
@@ -883,6 +910,7 @@ mod tests {
     use crate::physical_plan::windows::create_window_expr;
     use crate::physical_plan::{displayable, Partitioning};
     use crate::prelude::SessionContext;
+    use crate::test::csv_exec_sorted;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
     use datafusion_common::{Result, Statistics};
@@ -909,6 +937,17 @@ mod tests {
         Ok(schema)
     }
 
+    // Generate a schema which consists of 5 columns (a, b, c, d, e)
+    fn create_test_schema3() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, false);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, false);
+        let e = Field::new("e", DataType::Int32, false);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
+        Ok(schema)
+    }
+
     // Util function to get string representation of a physical plan
     fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
         let formatted = displayable(plan.as_ref()).indent().to_string();
@@ -993,6 +1032,109 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_can_skip_ordering_exhaustive() -> Result<()> {
+        let test_schema = create_test_schema3()?;
+        // Columns a,c are nullable whereas b,d are not nullable.
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST, d ASC NULLS FIRST
+        // Column e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+            sort_expr("d", &test_schema),
+        ];
+        let exec_unbounded = csv_exec_sorted(&test_schema, sort_exprs, true);
+
+        // test cases consists of vector of tuples. Where each tuple 
represents a single test case.
+        // First field in the tuple is Vec<str> where each element in the 
vector represents PARTITION BY columns
+        // For instance `vec!["a", "b"]` corresponds to PARTITION BY a, b
+        // Second field in the tuple is Vec<str> where each element in the 
vector represents ORDER BY columns
+        // For instance, vec!["c"], corresponds to ORDER BY c ASC NULLS FIRST, 
(ordering is default ordering. We do not check
+        // for reversibility in this test).
+        // Third field in the tuple is Option<bool>, which corresponds to 
expected result for `can_skip_sort` function.
+        // None represents that existing ordering is not sufficient to run 
executor.
+        // (We need to add SortExec to be able to run it).
+        // Some(bool) represents, we can run algorithm with existing ordering. 
If bool is `true`. We should reverse, window expressions
+        // if bool is `false`. Existing window expression can be used as is.
+        let test_cases = vec![
+            (vec!["a"], vec!["a"], Some(false)),
+            (vec!["a"], vec!["b"], Some(false)),
+            (vec!["a"], vec!["c"], None),
+            (vec!["a"], vec!["a", "b"], Some(false)),
+            (vec!["a"], vec!["b", "c"], Some(false)),
+            (vec!["a"], vec!["a", "c"], None),
+            (vec!["a"], vec!["a", "b", "c"], Some(false)),
+            (vec!["b"], vec!["a"], None),
+            (vec!["b"], vec!["b"], None),
+            (vec!["b"], vec!["c"], None),
+            (vec!["b"], vec!["a", "b"], None),
+            (vec!["b"], vec!["b", "c"], None),
+            (vec!["b"], vec!["a", "c"], None),
+            (vec!["b"], vec!["a", "b", "c"], None),
+            (vec!["c"], vec!["a"], None),
+            (vec!["c"], vec!["b"], None),
+            (vec!["c"], vec!["c"], None),
+            (vec!["c"], vec!["a", "b"], None),
+            (vec!["c"], vec!["b", "c"], None),
+            (vec!["c"], vec!["a", "c"], None),
+            (vec!["c"], vec!["a", "b", "c"], None),
+            (vec!["b", "a"], vec!["a"], Some(false)),
+            (vec!["b", "a"], vec!["b"], Some(false)),
+            (vec!["b", "a"], vec!["c"], Some(false)),
+            (vec!["b", "a"], vec!["a", "b"], Some(false)),
+            (vec!["b", "a"], vec!["b", "c"], Some(false)),
+            (vec!["b", "a"], vec!["a", "c"], Some(false)),
+            (vec!["b", "a"], vec!["a", "b", "c"], Some(false)),
+            (vec!["c", "b"], vec!["a"], None),
+            (vec!["c", "b"], vec!["b"], None),
+            (vec!["c", "b"], vec!["c"], None),
+            (vec!["c", "b"], vec!["a", "b"], None),
+            (vec!["c", "b"], vec!["b", "c"], None),
+            (vec!["c", "b"], vec!["a", "c"], None),
+            (vec!["c", "b"], vec!["a", "b", "c"], None),
+            (vec!["c", "a"], vec!["a"], None),
+            (vec!["c", "a"], vec!["b"], None),
+            (vec!["c", "a"], vec!["c"], None),
+            (vec!["c", "a"], vec!["a", "b"], None),
+            (vec!["c", "a"], vec!["b", "c"], None),
+            (vec!["c", "a"], vec!["a", "c"], None),
+            (vec!["c", "a"], vec!["a", "b", "c"], None),
+            (vec!["c", "b", "a"], vec!["a"], Some(false)),
+            (vec!["c", "b", "a"], vec!["b"], Some(false)),
+            (vec!["c", "b", "a"], vec!["c"], Some(false)),
+            (vec!["c", "b", "a"], vec!["a", "b"], Some(false)),
+            (vec!["c", "b", "a"], vec!["b", "c"], Some(false)),
+            (vec!["c", "b", "a"], vec!["a", "c"], Some(false)),
+            (vec!["c", "b", "a"], vec!["a", "b", "c"], Some(false)),
+        ];
+        for (case_idx, test_case) in test_cases.iter().enumerate() {
+            let (partition_by_columns, order_by_params, expected) = &test_case;
+            let mut partition_by_exprs = vec![];
+            for col_name in partition_by_columns {
+                partition_by_exprs.push(col(col_name, &test_schema)?);
+            }
+
+            let mut order_by_exprs = vec![];
+            for col_name in order_by_params {
+                let expr = col(col_name, &test_schema)?;
+                // Give default ordering, this is same with input ordering 
direction
+                // In this test we do check for reversibility.
+                let options = SortOptions::default();
+                order_by_exprs.push(PhysicalSortExpr { expr, options });
+            }
+            let res =
+                can_skip_sort(&partition_by_exprs, &order_by_exprs, 
&exec_unbounded)?;
+            assert_eq!(
+                res, *expected,
+                "Unexpected result for in unbounded test case#: {:?}, case: 
{:?}",
+                case_idx, test_case
+            );
+        }
+
+        Ok(())
+    }
+
     /// Runs the sort enforcement optimizer and asserts the plan
     /// against the original and expected plans
     ///
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 06bef0fbda..b20ff556c8 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -19,6 +19,10 @@
 
 use super::optimizer::PhysicalOptimizerRule;
 
+use std::borrow::Borrow;
+use std::collections::HashSet;
+use std::sync::Arc;
+
 use crate::config::ConfigOptions;
 use crate::error::Result;
 use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -30,9 +34,9 @@ use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 use datafusion_common::tree_node::Transformed;
+use datafusion_common::DataFusionError;
 use datafusion_physical_expr::utils::ordering_satisfy;
 use datafusion_physical_expr::PhysicalSortExpr;
-use std::sync::Arc;
 
 /// Convenience rule for writing optimizers: recursively invoke
 /// optimize on plan's children and then return a node of the same
@@ -77,6 +81,64 @@ pub fn add_sort_above(
     Ok(())
 }
 
+/// Find indices of each element in `targets` inside `items`. If one of the
+/// elements is absent in `items`, returns an error.
+pub fn find_indices<T: PartialEq, S: Borrow<T>>(
+    items: &[T],
+    targets: impl IntoIterator<Item = S>,
+) -> Result<Vec<usize>> {
+    targets
+        .into_iter()
+        .map(|target| items.iter().position(|e| target.borrow().eq(e)))
+        .collect::<Option<_>>()
+        .ok_or_else(|| DataFusionError::Execution("Target not 
found".to_string()))
+}
+
+/// Merges collections `first` and `second`, removes duplicates and sorts the
+/// result, returning it as a [`Vec`].
+pub fn merge_and_order_indices<T: Borrow<usize>, S: Borrow<usize>>(
+    first: impl IntoIterator<Item = T>,
+    second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+    let mut result: Vec<_> = first
+        .into_iter()
+        .map(|e| *e.borrow())
+        .chain(second.into_iter().map(|e| *e.borrow()))
+        .collect::<HashSet<_>>()
+        .into_iter()
+        .collect();
+    result.sort();
+    result
+}
+
+/// Checks whether the given index sequence is monotonically non-decreasing.
+pub fn is_sorted<T: Borrow<usize>>(sequence: impl IntoIterator<Item = T>) -> 
bool {
+    // TODO: Remove this function when `is_sorted` graduates from Rust nightly.
+    let mut previous = 0;
+    for item in sequence.into_iter() {
+        let current = *item.borrow();
+        if current < previous {
+            return false;
+        }
+        previous = current;
+    }
+    true
+}
+
+/// Calculates the set difference between sequences `first` and `second`,
+/// returning the result as a [`Vec`]. Preserves the ordering of `first`.
+pub fn set_difference<T: Borrow<usize>, S: Borrow<usize>>(
+    first: impl IntoIterator<Item = T>,
+    second: impl IntoIterator<Item = S>,
+) -> Vec<usize> {
+    let set: HashSet<_> = second.into_iter().map(|e| *e.borrow()).collect();
+    first
+        .into_iter()
+        .map(|e| *e.borrow())
+        .filter(|e| !set.contains(e))
+        .collect()
+}
+
 /// Checks whether the given operator is a limit;
 /// i.e. either a [`LocalLimitExec`] or a [`GlobalLimitExec`].
 pub fn is_limit(plan: &Arc<dyn ExecutionPlan>) -> bool {
@@ -113,3 +175,53 @@ pub fn is_union(plan: &Arc<dyn ExecutionPlan>) -> bool {
 pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
     plan.as_any().is::<RepartitionExec>()
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[tokio::test]
+    async fn test_find_indices() -> Result<()> {
+        assert_eq!(find_indices(&[0, 3, 4], [0, 3, 4])?, vec![0, 1, 2]);
+        assert_eq!(find_indices(&[0, 3, 4], [0, 4, 3])?, vec![0, 2, 1]);
+        assert_eq!(find_indices(&[3, 0, 4], [0, 3])?, vec![1, 0]);
+        assert!(find_indices(&[0, 3], [0, 3, 4]).is_err());
+        assert!(find_indices(&[0, 3, 4], [0, 2]).is_err());
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_merge_and_order_indices() {
+        assert_eq!(
+            merge_and_order_indices([0, 3, 4], [1, 3, 5]),
+            vec![0, 1, 3, 4, 5]
+        );
+        // Result should be ordered, even if inputs are not
+        assert_eq!(
+            merge_and_order_indices([3, 0, 4], [5, 1, 3]),
+            vec![0, 1, 3, 4, 5]
+        );
+    }
+
+    #[tokio::test]
+    async fn test_is_sorted() {
+        assert!(is_sorted::<usize>([]));
+        assert!(is_sorted([0]));
+        assert!(is_sorted([0, 3, 4]));
+        assert!(is_sorted([0, 1, 2]));
+        assert!(is_sorted([0, 1, 4]));
+        assert!(is_sorted([0usize; 0]));
+        assert!(is_sorted([1, 2]));
+        assert!(!is_sorted([3, 2]));
+    }
+
+    #[tokio::test]
+    async fn test_set_difference() {
+        assert_eq!(set_difference([0, 3, 4], [1, 2]), vec![0, 3, 4]);
+        assert_eq!(set_difference([0, 3, 4], [1, 2, 4]), vec![0, 3]);
+        // return value should have same ordering with the in1
+        assert_eq!(set_difference([3, 4, 0], [1, 2, 4]), vec![3, 0]);
+        assert_eq!(set_difference([0, 3, 4], [4, 1, 2]), vec![0, 3]);
+        assert_eq!(set_difference([3, 4, 0], [4, 1, 2]), vec![3, 0]);
+    }
+}
diff --git 
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index cc35541fc6..a8d02bcd42 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -46,8 +46,10 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::physical_plan::windows::calc_requirements;
-use datafusion_common::utils::evaluate_partition_ranges;
+use crate::physical_plan::windows::{
+    calc_requirements, get_ordered_partition_by_indices,
+};
+use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
 use datafusion_physical_expr::window::{
     PartitionBatchState, PartitionBatches, PartitionKey, 
PartitionWindowAggStates,
     WindowAggState, WindowState,
@@ -73,6 +75,9 @@ pub struct BoundedWindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that defines preset for existing ordering
+    // see `get_ordered_partition_by_indices` for more details.
+    ordered_partition_by_indices: Vec<usize>,
 }
 
 impl BoundedWindowAggExec {
@@ -85,6 +90,9 @@ impl BoundedWindowAggExec {
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
+
+        let ordered_partition_by_indices =
+            get_ordered_partition_by_indices(window_expr[0].partition_by(), 
&input);
         Ok(Self {
             input,
             window_expr,
@@ -92,6 +100,7 @@ impl BoundedWindowAggExec {
             input_schema,
             partition_keys,
             metrics: ExecutionPlanMetricsSet::new(),
+            ordered_partition_by_indices,
         })
     }
 
@@ -116,20 +125,9 @@ impl BoundedWindowAggExec {
     // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` 
columns can be used safely
     // to calculate partition separation points
     pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
-        let mut result = vec![];
-        // All window exprs have the same partition by, so we just use the 
first one:
-        let partition_by = self.window_expr()[0].partition_by();
+        // Partition by sort keys indices are stored in 
self.ordered_partition_by_indices.
         let sort_keys = self.input.output_ordering().unwrap_or(&[]);
-        for item in partition_by {
-            if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
-                result.push(a.clone());
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Partition key not found in sort keys".to_string(),
-                ));
-            }
-        }
-        Ok(result)
+        get_at_indices(sort_keys, &self.ordered_partition_by_indices)
     }
 }
 
@@ -166,8 +164,15 @@ impl ExecutionPlan for BoundedWindowAggExec {
     fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
         let partition_bys = self.window_expr()[0].partition_by();
         let order_keys = self.window_expr()[0].order_by();
-        let requirements = calc_requirements(partition_bys, order_keys);
-        vec![requirements]
+        if self.ordered_partition_by_indices.len() < partition_bys.len() {
+            vec![calc_requirements(partition_bys, order_keys)]
+        } else {
+            let partition_bys = self
+                .ordered_partition_by_indices
+                .iter()
+                .map(|idx| &partition_bys[*idx]);
+            vec![calc_requirements(partition_bys, order_keys)]
+        }
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -211,7 +216,8 @@ impl ExecutionPlan for BoundedWindowAggExec {
             input,
             BaselineMetrics::new(&self.metrics, partition),
             self.partition_by_sort_keys()?,
-        ));
+            self.ordered_partition_by_indices.clone(),
+        )?);
         Ok(stream)
     }
 
@@ -319,6 +325,7 @@ pub struct SortedPartitionByBoundedWindowStream {
     window_expr: Vec<Arc<dyn WindowExpr>>,
     partition_by_sort_keys: Vec<PhysicalSortExpr>,
     baseline_metrics: BaselineMetrics,
+    ordered_partition_by_indices: Vec<usize>,
 }
 
 impl PartitionByHandler for SortedPartitionByBoundedWindowStream {
@@ -429,10 +436,19 @@ impl SortedPartitionByBoundedWindowStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
         partition_by_sort_keys: Vec<PhysicalSortExpr>,
-    ) -> Self {
+        ordered_partition_by_indices: Vec<usize>,
+    ) -> Result<Self> {
         let state = window_expr.iter().map(|_| IndexMap::new()).collect();
         let empty_batch = RecordBatch::new_empty(schema.clone());
-        Self {
+
+        // In BoundedWindowAggExec all partition by columns should be ordered.
+        if window_expr[0].partition_by().len() != 
ordered_partition_by_indices.len() {
+            return Err(DataFusionError::Internal(
+                "All partition by columns should have an ordering".to_string(),
+            ));
+        }
+
+        Ok(Self {
             schema,
             input,
             input_buffer: empty_batch,
@@ -442,7 +458,8 @@ impl SortedPartitionByBoundedWindowStream {
             window_expr,
             baseline_metrics,
             partition_by_sort_keys,
-        }
+            ordered_partition_by_indices,
+        })
     }
 
     fn compute_aggregates(&mut self) -> Result<RecordBatch> {
@@ -621,10 +638,10 @@ impl SortedPartitionByBoundedWindowStream {
 
     /// Get Partition Columns
     pub fn partition_columns(&self, batch: &RecordBatch) -> 
Result<Vec<SortColumn>> {
-        self.partition_by_sort_keys
+        self.ordered_partition_by_indices
             .iter()
-            .map(|e| e.evaluate_to_sort_column(batch))
-            .collect::<Result<Vec<_>>>()
+            .map(|idx| 
self.partition_by_sort_keys[*idx].evaluate_to_sort_column(batch))
+            .collect()
     }
 }
 
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index d418e4b3d5..eea0749e94 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -25,7 +25,7 @@ use crate::physical_plan::{
         PhysicalSortExpr, RowNumber,
     },
     type_coercion::coerce,
-    udaf, PhysicalExpr,
+    udaf, ExecutionPlan, PhysicalExpr,
 };
 use crate::scalar::ScalarValue;
 use arrow::datatypes::Schema;
@@ -36,6 +36,7 @@ use datafusion_expr::{
 use datafusion_physical_expr::window::{
     BuiltInWindowFunctionExpr, SlidingAggregateWindowExpr,
 };
+use std::borrow::Borrow;
 use std::convert::TryInto;
 use std::sync::Arc;
 
@@ -43,6 +44,8 @@ mod bounded_window_agg_exec;
 mod window_agg_exec;
 
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
+use datafusion_common::utils::longest_consecutive_prefix;
+use datafusion_physical_expr::utils::{convert_to_expr, 
get_indices_of_matching_exprs};
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
@@ -188,24 +191,56 @@ fn create_built_in_window_expr(
     })
 }
 
-pub(crate) fn calc_requirements(
-    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
-    orderby_sort_exprs: &[PhysicalSortExpr],
+pub(crate) fn calc_requirements<
+    T: Borrow<Arc<dyn PhysicalExpr>>,
+    S: Borrow<PhysicalSortExpr>,
+>(
+    partition_by_exprs: impl IntoIterator<Item = T>,
+    orderby_sort_exprs: impl IntoIterator<Item = S>,
 ) -> Option<Vec<PhysicalSortRequirement>> {
-    let mut sort_reqs = vec![];
-    for partition_by in partition_by_exprs {
-        sort_reqs.push(PhysicalSortRequirement::new(partition_by.clone(), 
None))
-    }
-    for sort_expr in orderby_sort_exprs {
-        let contains = sort_reqs.iter().any(|e| sort_expr.expr.eq(e.expr()));
-        if !contains {
-            sort_reqs.push(PhysicalSortRequirement::from(sort_expr.clone()));
+    let mut sort_reqs = partition_by_exprs
+        .into_iter()
+        .map(|partition_by| {
+            PhysicalSortRequirement::new(partition_by.borrow().clone(), None)
+        })
+        .collect::<Vec<_>>();
+    for element in orderby_sort_exprs.into_iter() {
+        let PhysicalSortExpr { expr, options } = element.borrow();
+        if !sort_reqs.iter().any(|e| e.expr().eq(expr)) {
+            sort_reqs.push(PhysicalSortRequirement::new(expr.clone(), 
Some(*options)));
         }
     }
     // Convert empty result to None. Otherwise wrap result inside Some()
     (!sort_reqs.is_empty()).then_some(sort_reqs)
 }
 
+/// This function calculates the indices such that when partition by 
expressions reordered with this indices
+/// resulting expressions define a preset for existing ordering.
+// For instance, if input is ordered by a, b, c and PARTITION BY b, a is used
+// This vector will be [1, 0]. It means that when we iterate b,a columns with 
the order [1, 0]
+// resulting vector (a, b) is a preset of the existing ordering (a, b, c).
+pub(crate) fn get_ordered_partition_by_indices(
+    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
+    input: &Arc<dyn ExecutionPlan>,
+) -> Vec<usize> {
+    let input_ordering = input.output_ordering().unwrap_or(&[]);
+    let input_ordering_exprs = convert_to_expr(input_ordering);
+    let equal_properties = || input.equivalence_properties();
+    let input_places = get_indices_of_matching_exprs(
+        &input_ordering_exprs,
+        partition_by_exprs,
+        equal_properties,
+    );
+    let mut partition_places = get_indices_of_matching_exprs(
+        partition_by_exprs,
+        &input_ordering_exprs,
+        equal_properties,
+    );
+    partition_places.sort();
+    let first_n = longest_consecutive_prefix(partition_places);
+    input_places[0..first_n].to_vec()
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -215,7 +250,7 @@ mod tests {
     use crate::physical_plan::{collect, ExecutionPlan};
     use crate::prelude::SessionContext;
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
-    use crate::test::{self, assert_is_pending};
+    use crate::test::{self, assert_is_pending, csv_exec_sorted};
     use arrow::array::*;
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, SchemaRef};
@@ -235,10 +270,62 @@ mod tests {
         let b = Field::new("b", DataType::Int32, true);
         let c = Field::new("c", DataType::Int32, true);
         let d = Field::new("d", DataType::Int32, true);
-        let schema = Arc::new(Schema::new(vec![a, b, c, d]));
+        let e = Field::new("e", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d, e]));
         Ok(schema)
     }
 
+    /// make PhysicalSortExpr with default options
+    fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
+        sort_expr_options(name, schema, SortOptions::default())
+    }
+
+    /// PhysicalSortExpr with specified options
+    fn sort_expr_options(
+        name: &str,
+        schema: &Schema,
+        options: SortOptions,
+    ) -> PhysicalSortExpr {
+        PhysicalSortExpr {
+            expr: col(name, schema).unwrap(),
+            options,
+        }
+    }
+
+    #[tokio::test]
+    async fn test_get_partition_by_ordering() -> Result<()> {
+        let test_schema = create_test_schema2()?;
+        // Columns a,c are nullable whereas b,d are not nullable.
+        // Source is sorted by a ASC NULLS FIRST, b ASC NULLS FIRST, c ASC 
NULLS FIRST, d ASC NULLS FIRST
+        // Column e is not ordered.
+        let sort_exprs = vec![
+            sort_expr("a", &test_schema),
+            sort_expr("b", &test_schema),
+            sort_expr("c", &test_schema),
+            sort_expr("d", &test_schema),
+        ];
+        // Input is ordered by a,b,c,d
+        let input = csv_exec_sorted(&test_schema, sort_exprs, true);
+        let test_data = vec![
+            (vec!["a", "b"], vec![0, 1]),
+            (vec!["b", "a"], vec![1, 0]),
+            (vec!["b", "a", "c"], vec![1, 0, 2]),
+            (vec!["d", "b", "a"], vec![2, 1]),
+            (vec!["d", "e", "a"], vec![2]),
+        ];
+        for (pb_names, expected) in test_data {
+            let pb_exprs = pb_names
+                .iter()
+                .map(|name| col(name, &test_schema))
+                .collect::<Result<Vec<_>>>()?;
+            assert_eq!(
+                get_ordered_partition_by_indices(&pb_exprs, &input),
+                expected
+            );
+        }
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_calc_requirements() -> Result<()> {
         let schema = create_test_schema2()?;
@@ -298,7 +385,7 @@ mod tests {
                     expected = Some(vec![res]);
                 }
             }
-            assert_eq!(calc_requirements(&partitionbys, &orderbys), expected);
+            assert_eq!(calc_requirements(partitionbys, orderbys), expected);
         }
         Ok(())
     }
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index 520784fb9a..bda1b52ff8 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,7 +24,9 @@ use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
-use crate::physical_plan::windows::calc_requirements;
+use crate::physical_plan::windows::{
+    calc_requirements, get_ordered_partition_by_indices,
+};
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
@@ -38,7 +40,7 @@ use arrow::{
     datatypes::{Schema, SchemaRef},
     record_batch::RecordBatch,
 };
-use datafusion_common::utils::evaluate_partition_ranges;
+use datafusion_common::utils::{evaluate_partition_ranges, get_at_indices};
 use datafusion_common::DataFusionError;
 use datafusion_physical_expr::PhysicalSortRequirement;
 use futures::stream::Stream;
@@ -63,6 +65,9 @@ pub struct WindowAggExec {
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
+    /// Partition by indices that defines preset for existing ordering
+    // see `get_ordered_partition_by_indices` for more details.
+    ordered_partition_by_indices: Vec<usize>,
 }
 
 impl WindowAggExec {
@@ -76,6 +81,8 @@ impl WindowAggExec {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
 
+        let ordered_partition_by_indices =
+            get_ordered_partition_by_indices(window_expr[0].partition_by(), 
&input);
         Ok(Self {
             input,
             window_expr,
@@ -83,6 +90,7 @@ impl WindowAggExec {
             input_schema,
             partition_keys,
             metrics: ExecutionPlanMetricsSet::new(),
+            ordered_partition_by_indices,
         })
     }
 
@@ -107,20 +115,9 @@ impl WindowAggExec {
     // Hence returned `PhysicalSortExpr` corresponding to `PARTITION BY` 
columns can be used safely
     // to calculate partition separation points
     pub fn partition_by_sort_keys(&self) -> Result<Vec<PhysicalSortExpr>> {
-        let mut result = vec![];
-        // All window exprs have the same partition by, so we just use the 
first one:
-        let partition_by = self.window_expr()[0].partition_by();
+        // Partition by sort keys indices are stored in 
self.ordered_partition_by_indices.
         let sort_keys = self.input.output_ordering().unwrap_or(&[]);
-        for item in partition_by {
-            if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
-                result.push(a.clone());
-            } else {
-                return Err(DataFusionError::Execution(
-                    "Partition key not found in sort keys".to_string(),
-                ));
-            }
-        }
-        Ok(result)
+        get_at_indices(sort_keys, &self.ordered_partition_by_indices)
     }
 }
 
@@ -171,8 +168,15 @@ impl ExecutionPlan for WindowAggExec {
     fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
         let partition_bys = self.window_expr()[0].partition_by();
         let order_keys = self.window_expr()[0].order_by();
-        let requirements = calc_requirements(partition_bys, order_keys);
-        vec![requirements]
+        if self.ordered_partition_by_indices.len() < partition_bys.len() {
+            vec![calc_requirements(partition_bys, order_keys)]
+        } else {
+            let partition_bys = self
+                .ordered_partition_by_indices
+                .iter()
+                .map(|idx| &partition_bys[*idx]);
+            vec![calc_requirements(partition_bys, order_keys)]
+        }
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -211,7 +215,8 @@ impl ExecutionPlan for WindowAggExec {
             input,
             BaselineMetrics::new(&self.metrics, partition),
             self.partition_by_sort_keys()?,
-        ));
+            self.ordered_partition_by_indices.clone(),
+        )?);
         Ok(stream)
     }
 
@@ -300,6 +305,7 @@ pub struct WindowAggStream {
     window_expr: Vec<Arc<dyn WindowExpr>>,
     partition_by_sort_keys: Vec<PhysicalSortExpr>,
     baseline_metrics: BaselineMetrics,
+    ordered_partition_by_indices: Vec<usize>,
 }
 
 impl WindowAggStream {
@@ -310,8 +316,15 @@ impl WindowAggStream {
         input: SendableRecordBatchStream,
         baseline_metrics: BaselineMetrics,
         partition_by_sort_keys: Vec<PhysicalSortExpr>,
-    ) -> Self {
-        Self {
+        ordered_partition_by_indices: Vec<usize>,
+    ) -> Result<Self> {
+        // In WindowAggExec all partition by columns should be ordered.
+        if window_expr[0].partition_by().len() != 
ordered_partition_by_indices.len() {
+            return Err(DataFusionError::Internal(
+                "All partition by columns should have an ordering".to_string(),
+            ));
+        }
+        Ok(Self {
             schema,
             input,
             batches: vec![],
@@ -319,7 +332,8 @@ impl WindowAggStream {
             window_expr,
             baseline_metrics,
             partition_by_sort_keys,
-        }
+            ordered_partition_by_indices,
+        })
     }
 
     fn compute_aggregates(&self) -> Result<RecordBatch> {
@@ -331,9 +345,9 @@ impl WindowAggStream {
         }
 
         let partition_by_sort_keys = self
-            .partition_by_sort_keys
+            .ordered_partition_by_indices
             .iter()
-            .map(|elem| elem.evaluate_to_sort_column(&batch))
+            .map(|idx| 
self.partition_by_sort_keys[*idx].evaluate_to_sort_column(&batch))
             .collect::<Result<Vec<_>>>()?;
         let partition_points =
             evaluate_partition_ranges(batch.num_rows(), 
&partition_by_sort_keys)?;
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index e2035fb227..f99071a6f1 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -38,7 +38,8 @@ use arrow::record_batch::{RecordBatch, RecordBatchOptions};
 use bzip2::write::BzEncoder;
 #[cfg(feature = "compression")]
 use bzip2::Compression as BzCompression;
-use datafusion_common::DataFusionError;
+use datafusion_common::{DataFusionError, Statistics};
+use datafusion_physical_expr::PhysicalSortExpr;
 #[cfg(feature = "compression")]
 use flate2::write::GzEncoder;
 #[cfg(feature = "compression")]
@@ -326,6 +327,32 @@ pub fn create_vec_batches(schema: &Schema, n: usize) -> 
Vec<RecordBatch> {
     vec
 }
 
+/// Created a sorted Csv exec
+pub fn csv_exec_sorted(
+    schema: &SchemaRef,
+    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+    infinite_source: bool,
+) -> Arc<dyn ExecutionPlan> {
+    let sort_exprs = sort_exprs.into_iter().collect();
+
+    Arc::new(CsvExec::new(
+        FileScanConfig {
+            object_store_url: ObjectStoreUrl::parse("test:///").unwrap(),
+            file_schema: schema.clone(),
+            file_groups: vec![vec![PartitionedFile::new("x".to_string(), 
100)]],
+            statistics: Statistics::default(),
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            output_ordering: Some(sort_exprs),
+            infinite_source,
+        },
+        false,
+        0,
+        FileCompressionType::UNCOMPRESSED,
+    ))
+}
+
 /// Create batch
 fn create_batch(schema: &Schema) -> RecordBatch {
     RecordBatch::try_new(
diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt 
b/datafusion/core/tests/sqllogictests/test_files/window.slt
index 2213c95ec7..0170eed929 100644
--- a/datafusion/core/tests/sqllogictests/test_files/window.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/window.slt
@@ -2035,8 +2035,115 @@ SELECT
 -1114 -1927628110
 15673 -1899175111
 
+# test_window_agg_partition_by_set
+# These tests checks for whether BoundedWindowAggExec and WindowAggExec treats 
partition by expressions as set.
+# Physical plan shouldn't have any SortExec in between Window Executors.
+
 statement ok
-set datafusion.execution.target_partitions = 2;
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 5 FOLLOWING) as sum1,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 5 FOLLOWING) as sum2,
+    SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+    SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+    FROM aggregate_test_100
+    ORDER BY c9
+    LIMIT 5
+----
+logical_plan
+Limit: skip=0, fetch=5
+  Sort: aggregate_test_100.c9 ASC NULLS LAST, fetch=5
+    Projection: aggregate_test_100.c9, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, 
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1, aggregate_test_100. [...]
+      WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c2, aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING]]
+          WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+            WindowAggr: windowExpr=[[SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST, aggregate_test_100.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING]]
+              TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, 
c6, c7, c8, c9, c10, c11, c12, c13]
+physical_plan
+GlobalLimitExec: skip=0, fetch=5
+  SortExec: fetch=5, expr=[c9@0 ASC NULLS LAST]
+    ProjectionExec: expr=[c9@8 as c9, SUM(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1, aggregate_test_100.c2] ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@14 as sum1, 
SUM(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c2, 
aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING@16 as sum2, SUM(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1, aggregate_test [...]
+      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+        WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { 
name: "SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 
0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+            WindowAggExec: wdw=[SUM(aggregate_test_100.c9): Ok(Field { name: 
"SUM(aggregate_test_100.c9)", data_type: UInt64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+              SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@8 ASC 
NULLS LAST,c8@7 ASC NULLS LAST]
+                CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13]
+
+
+query IIIII
+SELECT c9,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 5 FOLLOWING) as sum1,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 5 FOLLOWING) as sum2,
+   SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+   SUM(c9) OVER(PARTITION BY c2, c1 ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+   FROM aggregate_test_100
+   ORDER BY c9
+   LIMIT 5
+----
+28774375 9144476174 9144476174 12665844451 12665844451
+63044568 5125627947 5125627947 5125627947 5125627947
+141047417 3650978969 3650978969 3650978969 3650978969
+141680161 8526017165 8526017165 11924524414 11924524414
+145294611 6802765992 6802765992 6802765992 6802765992
+
+
+# test_window_agg_child_equivalence
+
+query TT
+EXPLAIN SELECT c9,
+  SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 5 FOLLOWING) as sum1,
+  SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9 ASC ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING) as sum2,
+  SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+  SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+  FROM (SELECT c1, c2, c8, c9, c1 as c1_alias
+        FROM aggregate_test_100
+        ORDER BY c9) t1
+  LIMIT 5
+----
+logical_plan
+Projection: t1.c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER BY [t1.c9 ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, SUM(t1.c9) 
PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 
1 PRECEDING AND 5 FOLLOWING AS sum2, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] 
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING AS sum3, SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] 
ORDER BY [t1.c9 ASC NULLS LAS [...]
+  Limit: skip=0, fetch=5
+    WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] 
ORDER BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+      WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] 
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING]]
+        WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER 
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]]
+          WindowAggr: windowExpr=[[SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] 
ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING 
AND UNBOUNDED FOLLOWING]]
+            SubqueryAlias: t1
+              Sort: aggregate_test_100.c9 ASC NULLS LAST
+                Projection: aggregate_test_100.c1, aggregate_test_100.c2, 
aggregate_test_100.c8, aggregate_test_100.c9, aggregate_test_100.c1 AS c1_alias
+                  TableScan: aggregate_test_100 projection=[c1, c2, c8, c9]
+physical_plan
+ProjectionExec: expr=[c9@3 as c9, SUM(t1.c9) PARTITION BY [t1.c1, t1.c2] ORDER 
BY [t1.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@6 as sum1, 
SUM(t1.c9) PARTITION BY [t1.c2, t1.c1_alias] ORDER BY [t1.c9 ASC NULLS LAST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@8 as sum2, SUM(t1.c9) PARTITION BY 
[t1.c1, t1.c2] ORDER BY [t1.c9 ASC NULLS LAST, t1.c8 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING@5 as sum3, SUM(t1.c9) PARTITION BY 
[t1.c2, t1.c1_alias] ORDER BY [...]
+  GlobalLimitExec: skip=0, fetch=5
+    BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+      WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+        BoundedWindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(5)) }]
+          WindowAggExec: wdw=[SUM(t1.c9): Ok(Field { name: "SUM(t1.c9)", 
data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(1)), end_bound: Following(UInt64(NULL)) }]
+            SortExec: expr=[c1@0 ASC NULLS LAST,c2@1 ASC NULLS LAST,c9@3 ASC 
NULLS LAST,c8@2 ASC NULLS LAST]
+              ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c8@2 as c8, c9@3 
as c9, c1@0 as c1_alias]
+                CsvExec: files={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, has_header=true, 
limit=None, projection=[c1, c2, c8, c9]
+
+
+query IIIII
+SELECT c9,
+  SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING 
AND 5 FOLLOWING) as sum1,
+  SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9 ASC ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING) as sum2,
+  SUM(c9) OVER(PARTITION BY c1, c2 ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum3,
+  SUM(c9) OVER(PARTITION BY c2, c1_alias ORDER BY c9, c8 ASC ROWS BETWEEN 1 
PRECEDING AND UNBOUNDED FOLLOWING) as sum4
+  FROM (SELECT c1, c2, c8, c9, c1 as c1_alias
+        FROM aggregate_test_100
+        ORDER BY c9) t1
+  LIMIT 5
+----
+774637006 12189635055 12189635055 12189635055 12189635055
+1454057357 12189635055 12189635055 12189635055 12189635055
+2669374863 11414998049 11414998049 11414998049 11414998049
+3276123488 9960940692 9960940692 9960940692 9960940692
+4015442341 7291565829 7291565829 7291565829 7291565829
+
 
 # test_window_agg_with_bounded_group
 query TT
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index 1dd7a2609a..e459246692 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -29,6 +29,7 @@ use datafusion_common::tree_node::{
 };
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
+use std::borrow::Borrow;
 use std::collections::HashMap;
 use std::collections::HashSet;
 use std::sync::Arc;
@@ -153,19 +154,14 @@ pub fn normalize_expr_with_equivalence_properties(
     expr.clone()
         .transform(&|expr| {
             let normalized_form: Option<Arc<dyn PhysicalExpr>> =
-                match expr.as_any().downcast_ref::<Column>() {
-                    Some(column) => {
-                        let mut normalized: Option<Arc<dyn PhysicalExpr>> = 
None;
-                        for class in eq_properties {
-                            if class.contains(column) {
-                                normalized = 
Some(Arc::new(class.head().clone()));
-                                break;
-                            }
+                expr.as_any().downcast_ref::<Column>().and_then(|column| {
+                    for class in eq_properties {
+                        if class.contains(column) {
+                            return Some(Arc::new(class.head().clone()) as _);
                         }
-                        normalized
                     }
-                    None => None,
-                };
+                    None
+                });
             Ok(if let Some(normalized_form) = normalized_form {
                 Transformed::Yes(normalized_form)
             } else {
@@ -376,16 +372,61 @@ pub fn map_columns_before_projection(
     parent_required
         .iter()
         .filter_map(|r| {
-            if let Some(column) = r.as_any().downcast_ref::<Column>() {
-                column_mapping.get(column.name())
-            } else {
-                None
-            }
+            r.as_any()
+                .downcast_ref::<Column>()
+                .and_then(|c| column_mapping.get(c.name()))
         })
         .map(|e| Arc::new(e.clone()) as _)
         .collect()
 }
 
+/// This function returns all `Arc<dyn PhysicalExpr>`s inside the given
+/// `PhysicalSortExpr` sequence.
+pub fn convert_to_expr<T: Borrow<PhysicalSortExpr>>(
+    sequence: impl IntoIterator<Item = T>,
+) -> Vec<Arc<dyn PhysicalExpr>> {
+    sequence
+        .into_iter()
+        .map(|elem| elem.borrow().expr.clone())
+        .collect()
+}
+
+/// This function finds the indices of `targets` within `items`, taking into
+/// account equivalences according to `equal_properties`.
+pub fn get_indices_of_matching_exprs<
+    T: Borrow<Arc<dyn PhysicalExpr>>,
+    F: FnOnce() -> EquivalenceProperties,
+>(
+    targets: impl IntoIterator<Item = T>,
+    items: &[Arc<dyn PhysicalExpr>],
+    equal_properties: F,
+) -> Vec<usize> {
+    if let eq_classes @ [_, ..] = equal_properties().classes() {
+        let normalized_targets = targets.into_iter().map(|e| {
+            normalize_expr_with_equivalence_properties(e.borrow().clone(), 
eq_classes)
+        });
+        let normalized_items = items
+            .iter()
+            .map(|e| normalize_expr_with_equivalence_properties(e.clone(), 
eq_classes))
+            .collect::<Vec<_>>();
+        get_indices_of_exprs_strict(normalized_targets, &normalized_items)
+    } else {
+        get_indices_of_exprs_strict(targets, items)
+    }
+}
+
+/// This function finds the indices of `targets` within `items` using strict
+/// equality.
+fn get_indices_of_exprs_strict<T: Borrow<Arc<dyn PhysicalExpr>>>(
+    targets: impl IntoIterator<Item = T>,
+    items: &[Arc<dyn PhysicalExpr>],
+) -> Vec<usize> {
+    targets
+        .into_iter()
+        .filter_map(|target| items.iter().position(|e| e.eq(target.borrow())))
+        .collect()
+}
+
 #[derive(Clone, Debug)]
 pub struct ExprTreeNode<T> {
     expr: Arc<dyn PhysicalExpr>,
@@ -674,6 +715,42 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn test_convert_to_expr() -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", DataType::UInt64, 
false)]);
+        let sort_expr = vec![PhysicalSortExpr {
+            expr: col("a", &schema)?,
+            options: Default::default(),
+        }];
+        assert!(convert_to_expr(&sort_expr)[0].eq(&sort_expr[0].expr));
+        Ok(())
+    }
+
+    #[test]
+    fn test_get_indices_of_matching_exprs() {
+        let empty_schema = &Arc::new(Schema::empty());
+        let equal_properties = || 
EquivalenceProperties::new(empty_schema.clone());
+        let list1: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(Column::new("a", 0)),
+            Arc::new(Column::new("b", 1)),
+            Arc::new(Column::new("c", 2)),
+            Arc::new(Column::new("d", 3)),
+        ];
+        let list2: Vec<Arc<dyn PhysicalExpr>> = vec![
+            Arc::new(Column::new("b", 1)),
+            Arc::new(Column::new("c", 2)),
+            Arc::new(Column::new("a", 0)),
+        ];
+        assert_eq!(
+            get_indices_of_matching_exprs(&list1, &list2, equal_properties),
+            vec![2, 0, 1]
+        );
+        assert_eq!(
+            get_indices_of_matching_exprs(&list2, &list1, equal_properties),
+            vec![1, 2, 0]
+        );
+    }
+
     #[test]
     fn expr_list_eq_test() -> Result<()> {
         let list1: Vec<Arc<dyn PhysicalExpr>> = vec![

Reply via email to