This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c9bf3f3d1 Change required input ordering physical plan API to allow 
any NULLS FIRST / LAST and ASC / DESC (#5772)
c9bf3f3d1 is described below

commit c9bf3f3d1da7fd55b6d7c07d7347ea3ff56dc577
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Mar 30 16:17:33 2023 +0300

    Change required input ordering physical plan API to allow any NULLS FIRST / 
LAST and ASC / DESC (#5772)
    
    * Change required input ordering to format to not absolutely require 
direction.
    
    * remove unnecessary code
---
 .../core/src/physical_optimizer/repartition.rs     |   9 +-
 .../src/physical_optimizer/sort_enforcement.rs     |  32 +++---
 .../src/physical_plan/joins/sort_merge_join.rs     |  10 +-
 .../src/physical_plan/joins/symmetric_hash_join.rs |  14 ++-
 datafusion/core/src/physical_plan/mod.rs           |   4 +-
 datafusion/core/src/physical_plan/planner.rs       |  29 ------
 .../physical_plan/sorts/sort_preserving_merge.rs   |  14 ++-
 .../windows/bounded_window_agg_exec.rs             |  21 ++--
 datafusion/core/src/physical_plan/windows/mod.rs   | 102 +++++++++++++++++-
 .../src/physical_plan/windows/window_agg_exec.rs   |  17 ++-
 datafusion/core/tests/sql/select.rs                |   7 +-
 datafusion/core/tests/window_fuzz.rs               |  13 ++-
 datafusion/physical-expr/src/lib.rs                |   4 +-
 datafusion/physical-expr/src/sort_expr.rs          |  78 ++++++++++++--
 datafusion/physical-expr/src/utils.rs              | 114 +++++++++++++++++++--
 datafusion/proto/src/physical_plan/mod.rs          |   1 -
 16 files changed, 356 insertions(+), 113 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/repartition.rs 
b/datafusion/core/src/physical_optimizer/repartition.rs
index 8557769c3..71274d177 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -321,6 +321,9 @@ fn init() {
 mod tests {
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use datafusion_physical_expr::{
+        make_sort_requirements_from_exprs, PhysicalSortRequirement,
+    };
 
     use super::*;
     use crate::datasource::listing::PartitionedFile;
@@ -1131,8 +1134,10 @@ mod tests {
         }
 
         // model that it requires the output ordering of its input
-        fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
-            vec![self.input.output_ordering()]
+        fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+            vec![self
+                .output_ordering()
+                .map(make_sort_requirements_from_exprs)]
         }
 
         fn with_new_children(
diff --git a/datafusion/core/src/physical_optimizer/sort_enforcement.rs 
b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
index 9a87796bc..265c86cdf 100644
--- a/datafusion/core/src/physical_optimizer/sort_enforcement.rs
+++ b/datafusion/core/src/physical_optimizer/sort_enforcement.rs
@@ -47,7 +47,10 @@ use crate::physical_plan::{with_new_children_if_necessary, 
Distribution, Executi
 use arrow::datatypes::SchemaRef;
 use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
 use datafusion_common::{reverse_sort_options, DataFusionError};
-use datafusion_physical_expr::utils::{ordering_satisfy, 
ordering_satisfy_concrete};
+use datafusion_physical_expr::utils::{
+    make_sort_exprs_from_requirements, ordering_satisfy,
+    ordering_satisfy_requirement_concrete,
+};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 use itertools::{concat, izip};
 use std::iter::zip;
@@ -471,17 +474,20 @@ fn ensure_sorting(
         let physical_ordering = child.output_ordering();
         match (required_ordering, physical_ordering) {
             (Some(required_ordering), Some(physical_ordering)) => {
-                let is_ordering_satisfied = ordering_satisfy_concrete(
+                if !ordering_satisfy_requirement_concrete(
                     physical_ordering,
-                    required_ordering,
+                    &required_ordering,
                     || child.equivalence_properties(),
-                );
-                if !is_ordering_satisfied {
+                ) {
                     // Make sure we preserve the ordering requirements:
                     update_child_to_remove_unnecessary_sort(child, 
sort_onwards, &plan)?;
-                    let sort_expr = required_ordering.to_vec();
+                    let sort_expr = 
make_sort_exprs_from_requirements(&required_ordering);
                     add_sort_above(child, sort_expr)?;
-                    *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
+                    if is_sort(child) {
+                        *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
+                    } else {
+                        *sort_onwards = None;
+                    }
                 }
                 if let Some(tree) = sort_onwards {
                     // For window expressions, we can remove some sorts when 
we can
@@ -497,7 +503,8 @@ fn ensure_sorting(
             }
             (Some(required), None) => {
                 // Ordering requirement is not met, we should add a `SortExec` 
to the plan.
-                add_sort_above(child, required.to_vec())?;
+                let sort_expr = make_sort_exprs_from_requirements(&required);
+                add_sort_above(child, sort_expr)?;
                 *sort_onwards = Some(ExecTree::new(child.clone(), idx, 
vec![]));
             }
             (None, Some(_)) => {
@@ -592,7 +599,6 @@ fn analyze_window_sort_removal(
     };
 
     let mut first_should_reverse = None;
-    let mut physical_ordering_common = vec![];
     for sort_any in sort_tree.get_leaves() {
         let sort_output_ordering = sort_any.output_ordering();
         // Variable `sort_any` will either be a `SortExec` or a
@@ -609,11 +615,6 @@ fn analyze_window_sort_removal(
             DataFusionError::Plan("A SortExec should have output 
ordering".to_string())
         })?;
         if let Some(physical_ordering) = physical_ordering {
-            if physical_ordering_common.is_empty()
-                || physical_ordering.len() < physical_ordering_common.len()
-            {
-                physical_ordering_common = physical_ordering.to_vec();
-            }
             let (can_skip_sorting, should_reverse) = can_skip_sort(
                 window_expr[0].partition_by(),
                 required_ordering,
@@ -664,7 +665,6 @@ fn analyze_window_sort_removal(
                 new_child,
                 new_schema,
                 partition_keys.to_vec(),
-                Some(physical_ordering_common),
             )?) as _
         } else {
             Arc::new(WindowAggExec::try_new(
@@ -672,7 +672,6 @@ fn analyze_window_sort_removal(
                 new_child,
                 new_schema,
                 partition_keys.to_vec(),
-                Some(physical_ordering_common),
             )?) as _
         };
         return Ok(Some(PlanWithCorrespondingSort::new(new_plan)));
@@ -1889,7 +1888,6 @@ mod tests {
                 input.clone(),
                 input.schema(),
                 vec![],
-                Some(sort_exprs),
             )
             .unwrap(),
         )
diff --git a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs 
b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
index de9199ea8..623bafe09 100644
--- a/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
+++ b/datafusion/core/src/physical_plan/joins/sort_merge_join.rs
@@ -55,6 +55,9 @@ use crate::physical_plan::{
 };
 
 use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_physical_expr::{
+    make_sort_requirements_from_exprs, PhysicalSortRequirement,
+};
 
 /// join execution plan executes partitions in parallel and combines them into 
a set of
 /// partitions.
@@ -225,8 +228,11 @@ impl ExecutionPlan for SortMergeJoinExec {
         ]
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
-        vec![Some(&self.left_sort_exprs), Some(&self.right_sort_exprs)]
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+        vec![
+            Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)),
+            Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)),
+        ]
     }
 
     fn output_partitioning(&self) -> Partitioning {
diff --git a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs 
b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
index 3af983d8f..5a249e433 100644
--- a/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
+++ b/datafusion/core/src/physical_plan/joins/symmetric_hash_join.rs
@@ -46,6 +46,9 @@ use hashbrown::{raw::RawTable, HashSet};
 
 use datafusion_common::{utils::bisect, ScalarValue};
 use datafusion_physical_expr::intervals::{ExprIntervalGraph, Interval};
+use datafusion_physical_expr::{
+    make_sort_requirements_from_exprs, PhysicalSortRequirement,
+};
 
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
@@ -399,11 +402,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         self.schema.clone()
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
-        vec![
-            Some(&self.left_required_sort_exprs),
-            Some(&self.right_required_sort_exprs),
-        ]
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+        let left_required =
+            make_sort_requirements_from_exprs(&self.left_required_sort_exprs);
+        let right_required =
+            make_sort_requirements_from_exprs(&self.right_required_sort_exprs);
+        vec![Some(left_required), Some(right_required)]
     }
 
     fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
diff --git a/datafusion/core/src/physical_plan/mod.rs 
b/datafusion/core/src/physical_plan/mod.rs
index c59dd0c62..9815d9491 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -142,7 +142,7 @@ pub trait ExecutionPlan: Debug + Send + Sync {
     /// NOTE that checking `!is_empty()` does **not** check for a
     /// required input ordering. Instead, the correct check is that at
     /// least one entry must be `Some`
-    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
         vec![None; self.children().len()]
     }
 
@@ -591,11 +591,11 @@ impl Distribution {
 
 use datafusion_physical_expr::expressions::Column;
 pub use datafusion_physical_expr::window::WindowExpr;
-use datafusion_physical_expr::EquivalenceProperties;
 use datafusion_physical_expr::{
     expr_list_eq_strict_order, normalize_expr_with_equivalence_properties,
 };
 pub use datafusion_physical_expr::{AggregateExpr, PhysicalExpr};
+use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
 
 /// Applies an optional projection to a [`SchemaRef`], returning the
 /// projected schema
diff --git a/datafusion/core/src/physical_plan/planner.rs 
b/datafusion/core/src/physical_plan/planner.rs
index 51653450a..4b21a9bd7 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -577,33 +577,6 @@ impl DefaultPhysicalPlanner {
 
                     let logical_input_schema = input.schema();
 
-                    let physical_sort_keys = if sort_keys.is_empty() {
-                        None
-                    } else {
-                        let physical_input_schema = input_exec.schema();
-                        let sort_keys = sort_keys
-                            .iter()
-                            .map(|(e, _)| match e {
-                                Expr::Sort(expr::Sort {
-                                    expr,
-                                    asc,
-                                    nulls_first,
-                                }) => create_physical_sort_expr(
-                                    expr,
-                                    logical_input_schema,
-                                    &physical_input_schema,
-                                    SortOptions {
-                                        descending: !*asc,
-                                        nulls_first: *nulls_first,
-                                    },
-                                    session_state.execution_props(),
-                                ),
-                                _ => unreachable!(),
-                            })
-                            .collect::<Result<Vec<_>>>()?;
-                        Some(sort_keys)
-                    };
-
                     let physical_input_schema = input_exec.schema();
                     let window_expr = window_expr
                         .iter()
@@ -628,7 +601,6 @@ impl DefaultPhysicalPlanner {
                             input_exec,
                             physical_input_schema,
                             physical_partition_keys,
-                            physical_sort_keys,
                         )?)
                     } else {
                         Arc::new(WindowAggExec::try_new(
@@ -636,7 +608,6 @@ impl DefaultPhysicalPlanner {
                             input_exec,
                             physical_input_schema,
                             physical_partition_keys,
-                            physical_sort_keys,
                         )?)
                     })
                 }
diff --git a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs 
b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
index 7ef4d3bf8..edacae405 100644
--- a/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs
@@ -46,7 +46,9 @@ use crate::physical_plan::{
     Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
     SendableRecordBatchStream, Statistics,
 };
-use datafusion_physical_expr::EquivalenceProperties;
+use datafusion_physical_expr::{
+    make_sort_requirements_from_exprs, EquivalenceProperties, 
PhysicalSortRequirement,
+};
 
 /// Sort preserving merge execution plan
 ///
@@ -125,12 +127,16 @@ impl ExecutionPlan for SortPreservingMergeExec {
         vec![Distribution::UnspecifiedDistribution]
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
-        vec![Some(&self.expr)]
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+        vec![Some(make_sort_requirements_from_exprs(&self.expr))]
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        Some(&self.expr)
+        self.input.output_ordering()
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![true]
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
diff --git 
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index fa276c423..4b01d3b4e 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -50,11 +50,14 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
+use crate::physical_plan::windows::calc_requirements;
 use datafusion_physical_expr::window::{
     PartitionBatchState, PartitionBatches, PartitionKey, 
PartitionWindowAggStates,
     WindowAggState, WindowState,
 };
-use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use datafusion_physical_expr::{
+    EquivalenceProperties, PhysicalExpr, PhysicalSortRequirement,
+};
 use indexmap::IndexMap;
 use log::debug;
 
@@ -71,8 +74,6 @@ pub struct BoundedWindowAggExec {
     input_schema: SchemaRef,
     /// Partition Keys
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
-    /// Sort Keys
-    pub sort_keys: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -84,7 +85,6 @@ impl BoundedWindowAggExec {
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
         partition_keys: Vec<Arc<dyn PhysicalExpr>>,
-        sort_keys: Option<Vec<PhysicalSortExpr>>,
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
@@ -94,7 +94,6 @@ impl BoundedWindowAggExec {
             schema,
             input_schema,
             partition_keys,
-            sort_keys,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -123,7 +122,7 @@ impl BoundedWindowAggExec {
         let mut result = vec![];
         // All window exprs have the same partition by, so we just use the 
first one:
         let partition_by = self.window_expr()[0].partition_by();
-        let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+        let sort_keys = self.input.output_ordering().unwrap_or(&[]);
         for item in partition_by {
             if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
                 result.push(a.clone());
@@ -167,9 +166,11 @@ impl ExecutionPlan for BoundedWindowAggExec {
         self.input().output_ordering()
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
-        let sort_keys = self.sort_keys.as_deref();
-        vec![sort_keys]
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+        let partition_bys = self.window_expr()[0].partition_by();
+        let order_keys = self.window_expr()[0].order_by();
+        let requirements = calc_requirements(partition_bys, order_keys);
+        vec![requirements]
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -177,7 +178,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
             debug!("No partition defined for BoundedWindowAggExec!!!");
             vec![Distribution::SinglePartition]
         } else {
-            //TODO support PartitionCollections if there is no common 
partition columns in the window_expr
             vec![Distribution::HashPartitioned(self.partition_keys.clone())]
         }
     }
@@ -199,7 +199,6 @@ impl ExecutionPlan for BoundedWindowAggExec {
             children[0].clone(),
             self.input_schema.clone(),
             self.partition_keys.clone(),
-            self.sort_keys.clone(),
         )?))
     }
 
diff --git a/datafusion/core/src/physical_plan/windows/mod.rs 
b/datafusion/core/src/physical_plan/windows/mod.rs
index bdb9aa326..f7f9bb76b 100644
--- a/datafusion/core/src/physical_plan/windows/mod.rs
+++ b/datafusion/core/src/physical_plan/windows/mod.rs
@@ -46,6 +46,7 @@ pub use bounded_window_agg_exec::BoundedWindowAggExec;
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
+use datafusion_physical_expr::PhysicalSortRequirement;
 pub use window_agg_exec::WindowAggExec;
 
 /// Create a physical expression for window function
@@ -187,6 +188,30 @@ fn create_built_in_window_expr(
     })
 }
 
+pub(crate) fn calc_requirements(
+    partition_by_exprs: &[Arc<dyn PhysicalExpr>],
+    orderby_sort_exprs: &[PhysicalSortExpr],
+) -> Option<Vec<PhysicalSortRequirement>> {
+    let mut sort_reqs = vec![];
+    for partition_by in partition_by_exprs {
+        sort_reqs.push(PhysicalSortRequirement {
+            expr: partition_by.clone(),
+            options: None,
+        });
+    }
+    for PhysicalSortExpr { expr, options } in orderby_sort_exprs {
+        let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr));
+        if !contains {
+            sort_reqs.push(PhysicalSortRequirement {
+                expr: expr.clone(),
+                options: Some(*options),
+            });
+        }
+    }
+    // Convert empty result to None. Otherwise wrap result inside Some()
+    (!sort_reqs.is_empty()).then_some(sort_reqs)
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -198,6 +223,7 @@ mod tests {
     use crate::test::exec::{assert_strong_count_converges_to_zero, 
BlockingExec};
     use crate::test::{self, assert_is_pending};
     use arrow::array::*;
+    use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, SchemaRef};
     use arrow::record_batch::RecordBatch;
     use datafusion_common::cast::as_primitive_array;
@@ -210,6 +236,79 @@ mod tests {
         Ok((csv, schema))
     }
 
+    fn create_test_schema2() -> Result<SchemaRef> {
+        let a = Field::new("a", DataType::Int32, true);
+        let b = Field::new("b", DataType::Int32, true);
+        let c = Field::new("c", DataType::Int32, true);
+        let d = Field::new("d", DataType::Int32, true);
+        let schema = Arc::new(Schema::new(vec![a, b, c, d]));
+        Ok(schema)
+    }
+
+    #[tokio::test]
+    async fn test_calc_requirements() -> Result<()> {
+        let schema = create_test_schema2()?;
+        let test_data = vec![
+            // PARTITION BY a, ORDER BY b ASC NULLS FIRST
+            (
+                vec!["a"],
+                vec![("b", true, true)],
+                vec![("a", None), ("b", Some((true, true)))],
+            ),
+            // PARTITION BY a, ORDER BY a ASC NULLS FIRST
+            (vec!["a"], vec![("a", true, true)], vec![("a", None)]),
+            // PARTITION BY a, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
+            (
+                vec!["a"],
+                vec![("b", true, true), ("c", false, false)],
+                vec![
+                    ("a", None),
+                    ("b", Some((true, true))),
+                    ("c", Some((false, false))),
+                ],
+            ),
+            // PARTITION BY a, c, ORDER BY b ASC NULLS FIRST, c DESC NULLS LAST
+            (
+                vec!["a", "c"],
+                vec![("b", true, true), ("c", false, false)],
+                vec![("a", None), ("c", None), ("b", Some((true, true)))],
+            ),
+        ];
+        for (pb_params, ob_params, expected_params) in test_data {
+            let mut partitionbys = vec![];
+            for col_name in pb_params {
+                partitionbys.push(col(col_name, &schema)?);
+            }
+
+            let mut orderbys = vec![];
+            for (col_name, descending, nulls_first) in ob_params {
+                let expr = col(col_name, &schema)?;
+                let options = SortOptions {
+                    descending,
+                    nulls_first,
+                };
+                orderbys.push(PhysicalSortExpr { expr, options });
+            }
+
+            let mut expected: Option<Vec<PhysicalSortRequirement>> = None;
+            for (col_name, reqs) in expected_params {
+                let options = reqs.map(|(descending, nulls_first)| SortOptions 
{
+                    descending,
+                    nulls_first,
+                });
+                let expr = col(col_name, &schema)?;
+                let res = PhysicalSortRequirement { expr, options };
+                if let Some(expected) = &mut expected {
+                    expected.push(res);
+                } else {
+                    expected = Some(vec![res]);
+                }
+            }
+            assert_eq!(calc_requirements(&partitionbys, &orderbys), expected);
+        }
+        Ok(())
+    }
+
     #[tokio::test]
     async fn window_function_with_udaf() -> Result<()> {
         #[derive(Debug)]
@@ -269,7 +368,6 @@ mod tests {
             input,
             schema.clone(),
             vec![],
-            None,
         )?);
 
         let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
@@ -323,7 +421,6 @@ mod tests {
             input,
             schema.clone(),
             vec![],
-            None,
         )?);
 
         let result: Vec<RecordBatch> = collect(window_exec, task_ctx).await?;
@@ -371,7 +468,6 @@ mod tests {
             blocking_exec,
             schema,
             vec![],
-            None,
         )?);
 
         let fut = collect(window_agg_exec, task_ctx);
diff --git a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
index a667f0a3c..29d5c4dc8 100644
--- a/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/window_agg_exec.rs
@@ -24,6 +24,7 @@ use crate::physical_plan::expressions::PhysicalSortExpr;
 use crate::physical_plan::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet,
 };
+use crate::physical_plan::windows::calc_requirements;
 use crate::physical_plan::{
     ColumnStatistics, DisplayFormatType, Distribution, EquivalenceProperties,
     ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
@@ -39,6 +40,7 @@ use arrow::{
     record_batch::RecordBatch,
 };
 use datafusion_common::DataFusionError;
+use datafusion_physical_expr::PhysicalSortRequirement;
 use futures::stream::Stream;
 use futures::{ready, StreamExt};
 use log::debug;
@@ -61,8 +63,6 @@ pub struct WindowAggExec {
     input_schema: SchemaRef,
     /// Partition Keys
     pub partition_keys: Vec<Arc<dyn PhysicalExpr>>,
-    /// Sort Keys
-    pub sort_keys: Option<Vec<PhysicalSortExpr>>,
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
 }
@@ -74,7 +74,6 @@ impl WindowAggExec {
         input: Arc<dyn ExecutionPlan>,
         input_schema: SchemaRef,
         partition_keys: Vec<Arc<dyn PhysicalExpr>>,
-        sort_keys: Option<Vec<PhysicalSortExpr>>,
     ) -> Result<Self> {
         let schema = create_schema(&input_schema, &window_expr)?;
         let schema = Arc::new(schema);
@@ -85,7 +84,6 @@ impl WindowAggExec {
             schema,
             input_schema,
             partition_keys,
-            sort_keys,
             metrics: ExecutionPlanMetricsSet::new(),
         })
     }
@@ -114,7 +112,7 @@ impl WindowAggExec {
         let mut result = vec![];
         // All window exprs have the same partition by, so we just use the 
first one:
         let partition_by = self.window_expr()[0].partition_by();
-        let sort_keys = self.sort_keys.as_deref().unwrap_or(&[]);
+        let sort_keys = self.input.output_ordering().unwrap_or(&[]);
         for item in partition_by {
             if let Some(a) = sort_keys.iter().find(|&e| e.expr.eq(item)) {
                 result.push(a.clone());
@@ -172,9 +170,11 @@ impl ExecutionPlan for WindowAggExec {
         vec![true]
     }
 
-    fn required_input_ordering(&self) -> Vec<Option<&[PhysicalSortExpr]>> {
-        let sort_keys = self.sort_keys.as_deref();
-        vec![sort_keys]
+    fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
+        let partition_bys = self.window_expr()[0].partition_by();
+        let order_keys = self.window_expr()[0].order_by();
+        let requirements = calc_requirements(partition_bys, order_keys);
+        vec![requirements]
     }
 
     fn required_input_distribution(&self) -> Vec<Distribution> {
@@ -200,7 +200,6 @@ impl ExecutionPlan for WindowAggExec {
             children[0].clone(),
             self.input_schema.clone(),
             self.partition_keys.clone(),
-            self.sort_keys.clone(),
         )?))
     }
 
diff --git a/datafusion/core/tests/sql/select.rs 
b/datafusion/core/tests/sql/select.rs
index 289247684..b6017f826 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -670,7 +670,7 @@ async fn sort_on_window_null_string() -> Result<()> {
     ])
     .unwrap();
 
-    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(2));
+    let ctx = 
SessionContext::with_config(SessionConfig::new().with_target_partitions(1));
     ctx.register_batch("test", batch)?;
 
     let sql =
@@ -689,7 +689,8 @@ async fn sort_on_window_null_string() -> Result<()> {
     ];
     assert_batches_eq!(expected, &actual);
 
-    let sql = "SELECT d2, row_number() OVER (partition by d2) as rn1 FROM 
test";
+    let sql =
+        "SELECT d2, row_number() OVER (partition by d2) as rn1 FROM test ORDER 
BY d2 asc";
     let actual = execute_to_batches(&ctx, sql).await;
     // NULLS LAST
     let expected = vec![
@@ -704,7 +705,7 @@ async fn sort_on_window_null_string() -> Result<()> {
     assert_batches_eq!(expected, &actual);
 
     let sql =
-        "SELECT d2, row_number() OVER (partition by d2 order by d2 desc) as 
rn1 FROM test";
+        "SELECT d2, row_number() OVER (partition by d2 order by d2 desc) as 
rn1 FROM test ORDER BY d2 desc";
 
     let actual = execute_to_batches(&ctx, sql).await;
     // NULLS FIRST
diff --git a/datafusion/core/tests/window_fuzz.rs 
b/datafusion/core/tests/window_fuzz.rs
index a71aab280..179bada53 100644
--- a/datafusion/core/tests/window_fuzz.rs
+++ b/datafusion/core/tests/window_fuzz.rs
@@ -330,7 +330,9 @@ async fn run_window_test(
 
     let concat_input_record = concat_batches(&schema, &input1).unwrap();
     let exec1 = Arc::new(
-        MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), 
None).unwrap(),
+        MemoryExec::try_new(&[vec![concat_input_record]], schema.clone(), None)
+            .unwrap()
+            .with_sort_information(sort_keys.clone()),
     );
     let usual_window_exec = Arc::new(
         WindowAggExec::try_new(
@@ -347,12 +349,14 @@ async fn run_window_test(
             exec1,
             schema.clone(),
             vec![],
-            Some(sort_keys.clone()),
         )
         .unwrap(),
     );
-    let exec2 =
-        Arc::new(MemoryExec::try_new(&[input1.clone()], schema.clone(), 
None).unwrap());
+    let exec2 = Arc::new(
+        MemoryExec::try_new(&[input1.clone()], schema.clone(), None)
+            .unwrap()
+            .with_sort_information(sort_keys),
+    );
     let running_window_exec = Arc::new(
         BoundedWindowAggExec::try_new(
             vec![create_window_expr(
@@ -368,7 +372,6 @@ async fn run_window_test(
             exec2,
             schema.clone(),
             vec![],
-            Some(sort_keys),
         )
         .unwrap(),
     );
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index 56ca0824f..a1698fe07 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -53,7 +53,9 @@ pub use equivalence::EquivalentClass;
 pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, 
PhysicalExprRef};
 pub use planner::create_physical_expr;
 pub use scalar_function::ScalarFunctionExpr;
-pub use sort_expr::PhysicalSortExpr;
+pub use sort_expr::{
+    make_sort_requirements_from_exprs, PhysicalSortExpr, 
PhysicalSortRequirement,
+};
 pub use utils::{
     expr_list_eq_any_order, expr_list_eq_strict_order,
     normalize_expr_with_equivalence_properties, 
normalize_out_expr_with_alias_schema,
diff --git a/datafusion/physical-expr/src/sort_expr.rs 
b/datafusion/physical-expr/src/sort_expr.rs
index f8172dabf..08bd394e6 100644
--- a/datafusion/physical-expr/src/sort_expr.rs
+++ b/datafusion/physical-expr/src/sort_expr.rs
@@ -41,14 +41,7 @@ impl PartialEq for PhysicalSortExpr {
 
 impl std::fmt::Display for PhysicalSortExpr {
     fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
-        let opts_string = match (self.options.descending, 
self.options.nulls_first) {
-            (true, true) => "DESC",
-            (true, false) => "DESC NULLS LAST",
-            (false, true) => "ASC",
-            (false, false) => "ASC NULLS LAST",
-        };
-
-        write!(f, "{} {}", self.expr, opts_string)
+        write!(f, "{} {}", self.expr, to_str(&self.options))
     }
 }
 
@@ -69,4 +62,73 @@ impl PhysicalSortExpr {
             options: Some(self.options),
         })
     }
+
+    /// Check whether sort expression satisfies `PhysicalSortRequirement`.
+    // If sort options is Some in `PhysicalSortRequirement`, `expr` and 
`options` field are compared for equality.
+    // If sort options is None in `PhysicalSortRequirement`, only `expr` is 
compared for equality.
+    pub fn satisfy(&self, requirement: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&requirement.expr)
+            && requirement
+                .options
+                .map_or(true, |opts| self.options == opts)
+    }
+}
+
+/// Represents sort requirement associated with a plan
+#[derive(Clone, Debug)]
+pub struct PhysicalSortRequirement {
+    /// Physical expression representing the column to sort
+    pub expr: Arc<dyn PhysicalExpr>,
+    /// Option to specify how the given column should be sorted.
+    /// If unspecified, there is no constraint on sort options.
+    pub options: Option<SortOptions>,
+}
+
+impl From<PhysicalSortExpr> for PhysicalSortRequirement {
+    fn from(value: PhysicalSortExpr) -> Self {
+        Self {
+            expr: value.expr,
+            options: Some(value.options),
+        }
+    }
+}
+
+impl PartialEq for PhysicalSortRequirement {
+    fn eq(&self, other: &PhysicalSortRequirement) -> bool {
+        self.options == other.options && self.expr.eq(&other.expr)
+    }
+}
+
+impl std::fmt::Display for PhysicalSortRequirement {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let opts_string = self.options.as_ref().map_or("NA", to_str);
+        write!(f, "{} {}", self.expr, opts_string)
+    }
+}
+
+impl PhysicalSortRequirement {
+    /// Returns whether this requirement is equal or more specific than 
`other`.
+    pub fn compatible(&self, other: &PhysicalSortRequirement) -> bool {
+        self.expr.eq(&other.expr)
+            && other.options.map_or(true, |other_opts| {
+                self.options.map_or(false, |opts| opts == other_opts)
+            })
+    }
+}
+
+pub fn make_sort_requirements_from_exprs(
+    ordering: &[PhysicalSortExpr],
+) -> Vec<PhysicalSortRequirement> {
+    ordering.iter().map(|e| e.clone().into()).collect()
+}
+
+/// Returns the SQL string representation of the given [SortOptions] object.
+#[inline]
+fn to_str(options: &SortOptions) -> &str {
+    match (options.descending, options.nulls_first) {
+        (true, true) => "DESC",
+        (true, false) => "DESC NULLS LAST",
+        (false, true) => "ASC",
+        (false, false) => "ASC NULLS LAST",
+    }
 }
diff --git a/datafusion/physical-expr/src/utils.rs 
b/datafusion/physical-expr/src/utils.rs
index 7c8c94c7d..cd4ac6ff3 100644
--- a/datafusion/physical-expr/src/utils.rs
+++ b/datafusion/physical-expr/src/utils.rs
@@ -17,11 +17,14 @@
 
 use crate::equivalence::EquivalentClass;
 use crate::expressions::{BinaryExpr, Column, UnKnownColumn};
-use crate::{EquivalenceProperties, PhysicalExpr, PhysicalSortExpr};
+use crate::{
+    EquivalenceProperties, PhysicalExpr, PhysicalSortExpr, 
PhysicalSortRequirement,
+};
 use arrow::datatypes::SchemaRef;
 use datafusion_common::Result;
 use datafusion_expr::Operator;
 
+use arrow_schema::SortOptions;
 use datafusion_common::tree_node::{
     Transformed, TreeNode, TreeNodeRewriter, VisitRecursion,
 };
@@ -199,6 +202,24 @@ pub fn normalize_sort_expr_with_equivalence_properties(
     }
 }
 
+pub fn normalize_sort_requirement_with_equivalence_properties(
+    sort_requirement: PhysicalSortRequirement,
+    eq_properties: &[EquivalentClass],
+) -> PhysicalSortRequirement {
+    let normalized_expr = normalize_expr_with_equivalence_properties(
+        sort_requirement.expr.clone(),
+        eq_properties,
+    );
+    if sort_requirement.expr.ne(&normalized_expr) {
+        PhysicalSortRequirement {
+            expr: normalized_expr,
+            options: sort_requirement.options,
+        }
+    } else {
+        sort_requirement
+    }
+}
+
 /// Checks whether given ordering requirements are satisfied by provided 
[PhysicalSortExpr]s.
 pub fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
     provided: Option<&[PhysicalSortExpr]>,
@@ -224,31 +245,102 @@ pub fn ordering_satisfy_concrete<F: FnOnce() -> 
EquivalenceProperties>(
     } else if required
         .iter()
         .zip(provided.iter())
-        .all(|(order1, order2)| order1.eq(order2))
+        .all(|(req, given)| req.eq(given))
     {
         true
     } else if let eq_classes @ [_, ..] = equal_properties().classes() {
-        let normalized_required_exprs = required
+        required
             .iter()
             .map(|e| {
                 normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
             })
-            .collect::<Vec<_>>();
-        let normalized_provided_exprs = provided
+            .zip(provided.iter().map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
+            }))
+            .all(|(req, given)| req.eq(&given))
+    } else {
+        false
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+pub fn ordering_satisfy_requirement<F: FnOnce() -> EquivalenceProperties>(
+    provided: Option<&[PhysicalSortExpr]>,
+    required: Option<&[PhysicalSortRequirement]>,
+    equal_properties: F,
+) -> bool {
+    match (provided, required) {
+        (_, None) => true,
+        (None, Some(_)) => false,
+        (Some(provided), Some(required)) => {
+            ordering_satisfy_requirement_concrete(provided, required, 
equal_properties)
+        }
+    }
+}
+
+/// Checks whether the given [`PhysicalSortRequirement`]s are satisfied by the
+/// provided [`PhysicalSortExpr`]s.
+pub fn ordering_satisfy_requirement_concrete<F: FnOnce() -> 
EquivalenceProperties>(
+    provided: &[PhysicalSortExpr],
+    required: &[PhysicalSortRequirement],
+    equal_properties: F,
+) -> bool {
+    if required.len() > provided.len() {
+        false
+    } else if required
+        .iter()
+        .zip(provided.iter())
+        .all(|(req, given)| given.satisfy(req))
+    {
+        true
+    } else if let eq_classes @ [_, ..] = equal_properties().classes() {
+        required
             .iter()
             .map(|e| {
-                normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
+                normalize_sort_requirement_with_equivalence_properties(
+                    e.clone(),
+                    eq_classes,
+                )
             })
-            .collect::<Vec<_>>();
-        normalized_required_exprs
-            .iter()
-            .zip(normalized_provided_exprs.iter())
-            .all(|(order1, order2)| order1.eq(order2))
+            .zip(provided.iter().map(|e| {
+                normalize_sort_expr_with_equivalence_properties(e.clone(), 
eq_classes)
+            }))
+            .all(|(req, given)| given.satisfy(&req))
     } else {
         false
     }
 }
 
+/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
+/// for each entry in the input. If required ordering is None for an entry
+/// default ordering `ASC, NULLS LAST` if given.
+pub fn make_sort_exprs_from_requirements(
+    required: &[PhysicalSortRequirement],
+) -> Vec<PhysicalSortExpr> {
+    required
+        .iter()
+        .map(|requirement| {
+            if let Some(options) = requirement.options {
+                PhysicalSortExpr {
+                    expr: requirement.expr.clone(),
+                    options,
+                }
+            } else {
+                PhysicalSortExpr {
+                    expr: requirement.expr.clone(),
+                    options: SortOptions {
+                        // By default, create sort key with ASC is true and 
NULLS LAST to be consistent with
+                        // PostgreSQL's rule: 
https://www.postgresql.org/docs/current/queries-order.html
+                        descending: false,
+                        nulls_first: false,
+                    },
+                }
+            }
+        })
+        .collect()
+}
+
 #[derive(Clone, Debug)]
 pub struct ExprTreeNode<T> {
     expr: Arc<dyn PhysicalExpr>,
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 92986b0b3..7711b3c96 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -334,7 +334,6 @@ impl AsExecutionPlan for PhysicalPlanNode {
                     input,
                     Arc::new((&input_schema).try_into()?),
                     vec![],
-                    None,
                 )?))
             }
             PhysicalPlanType::Aggregate(hash_agg) => {


Reply via email to