This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6a02384d19 Deprecate `PhysicalSortRequirement::from_sort_exprs` and 
`PhysicalSortRequirement::to_sort_exprs` (#13222)
6a02384d19 is described below

commit 6a02384d198b3420b96892cc7a72fc202770375c
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Nov 6 06:24:40 2024 -0500

    Deprecate `PhysicalSortRequirement::from_sort_exprs` and 
`PhysicalSortRequirement::to_sort_exprs` (#13222)
    
    * Deprecate `PhysicalSortRequirement::from_sort_exprs` and 
`PhysicalSortRequirement::to_sort_exprs`
    
    * Update for API changes
    
    * fix clippy
    
    * fmt
---
 .../src/physical_optimizer/enforce_distribution.rs |  5 +---
 .../core/src/physical_optimizer/enforce_sorting.rs |  8 +++---
 .../core/src/physical_optimizer/sort_pushdown.rs   | 27 ++++++++++----------
 .../core/src/physical_optimizer/test_utils.rs      |  8 ++----
 .../src/physical_optimizer/update_aggr_exprs.rs    | 10 +++-----
 datafusion/core/src/physical_optimizer/utils.rs    |  5 ++--
 datafusion/physical-expr-common/src/sort_expr.rs   | 29 +++++-----------------
 datafusion/physical-expr/src/equivalence/class.rs  |  4 +--
 .../physical-expr/src/equivalence/properties.rs    | 12 ++++-----
 .../physical-optimizer/src/output_requirements.rs  |  6 ++---
 datafusion/physical-plan/src/aggregates/mod.rs     |  6 ++---
 .../physical-plan/src/joins/sort_merge_join.rs     | 10 +++-----
 .../physical-plan/src/joins/symmetric_hash_join.rs | 10 ++++----
 .../src/sorts/sort_preserving_merge.rs             |  5 +---
 datafusion/physical-plan/src/windows/mod.rs        |  5 ++--
 datafusion/proto/src/physical_plan/mod.rs          | 10 +++-----
 16 files changed, 61 insertions(+), 99 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 6863978610..4bbb995c36 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1423,7 +1423,6 @@ pub(crate) mod tests {
     use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
     use datafusion_physical_expr::{
         expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
-        PhysicalSortRequirement,
     };
     use datafusion_physical_expr_common::sort_expr::LexRequirement;
     use datafusion_physical_plan::PlanProperties;
@@ -1496,9 +1495,7 @@ pub(crate) mod tests {
             if self.expr.is_empty() {
                 vec![None]
             } else {
-                vec![Some(PhysicalSortRequirement::from_sort_exprs(
-                    self.expr.iter(),
-                ))]
+                vec![Some(LexRequirement::from(self.expr.clone()))]
             }
         }
 
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index adc3d7cac1..636d52ccc9 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, 
InputOrderMode};
 
 use datafusion_common::plan_err;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
+use datafusion_physical_expr::Partitioning;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
@@ -221,7 +221,7 @@ fn replace_with_partial_sort(
         // here we're trying to find the common prefix for sorted columns that 
is required for the
         // sort and already satisfied by the given ordering
         let child_eq_properties = child.equivalence_properties();
-        let sort_req = 
PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
+        let sort_req = LexRequirement::from(sort_plan.expr().clone());
 
         let mut common_prefix_length = 0;
         while child_eq_properties.ordering_satisfy_requirement(&LexRequirement 
{
@@ -275,8 +275,8 @@ fn parallelize_sorts(
     {
         // Take the initial sort expressions and requirements
         let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
-        let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
-        let sort_exprs = LexOrdering::new(sort_exprs.to_vec());
+        let sort_reqs = LexRequirement::from(sort_exprs.clone());
+        let sort_exprs = sort_exprs.clone();
 
         // If there is a connection between a `CoalescePartitionsExec` and a
         // global sort that satisfy the requirements (i.e. intermediate
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index e231e719b2..42d682169d 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -94,7 +94,8 @@ fn pushdown_sorts_helper(
     if is_sort(plan) {
         let required_ordering = plan
             .output_ordering()
-            .map(PhysicalSortRequirement::from_sort_exprs)
+            .cloned()
+            .map(LexRequirement::from)
             .unwrap_or_default();
         if !satisfy_parent {
             // Make sure this `SortExec` satisfies parent requirements:
@@ -180,11 +181,12 @@ fn pushdown_requirement_to_children(
             RequirementsCompatibility::NonCompatible => Ok(None),
         }
     } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
-        let sort_req = PhysicalSortRequirement::from_sort_exprs(
+        let sort_req = LexRequirement::from(
             sort_exec
                 .properties()
                 .output_ordering()
-                .unwrap_or(&LexOrdering::default()),
+                .cloned()
+                .unwrap_or(LexOrdering::default()),
         );
         if sort_exec
             .properties()
@@ -205,10 +207,11 @@ fn pushdown_requirement_to_children(
             .iter()
             .all(|maintain| *maintain)
     {
-        let output_req = PhysicalSortRequirement::from_sort_exprs(
+        let output_req = LexRequirement::from(
             plan.properties()
                 .output_ordering()
-                .unwrap_or(&LexOrdering::default()),
+                .cloned()
+                .unwrap_or(LexOrdering::default()),
         );
         // Push down through operator with fetch when:
         // - requirement is aligned with output ordering
@@ -227,14 +230,12 @@ fn pushdown_requirement_to_children(
     } else if is_union(plan) {
         // UnionExec does not have real sort requirements for its input. Here 
we change the adjusted_request_ordering to UnionExec's output ordering and
         // propagate the sort requirements down to correct the unnecessary 
descendant SortExec under the UnionExec
-        let req = (!parent_required.is_empty())
-            .then(|| LexRequirement::new(parent_required.to_vec()));
+        let req = (!parent_required.is_empty()).then(|| 
parent_required.clone());
         Ok(Some(vec![req; plan.children().len()]))
     } else if let Some(smj) = 
plan.as_any().downcast_ref::<SortMergeJoinExec>() {
         // If the current plan is SortMergeJoinExec
         let left_columns_len = smj.left().schema().fields().len();
-        let parent_required_expr =
-            
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
+        let parent_required_expr = LexOrdering::from(parent_required.clone());
         match expr_source_side(
             parent_required_expr.as_ref(),
             smj.join_type(),
@@ -251,8 +252,7 @@ fn pushdown_requirement_to_children(
                     smj.schema().fields.len() - 
smj.right().schema().fields.len();
                 let new_right_required =
                     shift_right_required(parent_required, right_offset)?;
-                let new_right_required_expr =
-                    PhysicalSortRequirement::to_sort_exprs(new_right_required);
+                let new_right_required_expr = 
LexOrdering::from(new_right_required);
                 try_pushdown_requirements_to_join(
                     smj,
                     parent_required,
@@ -278,8 +278,7 @@ fn pushdown_requirement_to_children(
         // Pushing down is not beneficial
         Ok(None)
     } else if is_sort_preserving_merge(plan) {
-        let new_ordering =
-            PhysicalSortRequirement::to_sort_exprs(parent_required.to_vec());
+        let new_ordering = LexOrdering::from(parent_required.clone());
         let mut spm_eqs = plan.equivalence_properties().clone();
         // Sort preserving merge will have new ordering, one requirement above 
is pushed down to its below.
         spm_eqs = spm_eqs.with_reorder(new_ordering);
@@ -412,7 +411,7 @@ fn try_pushdown_requirements_to_join(
     let should_pushdown = 
smj_eqs.ordering_satisfy_requirement(parent_required);
     Ok(should_pushdown.then(|| {
         let mut required_input_ordering = smj.required_input_ordering();
-        let new_req = 
Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
+        let new_req = Some(LexRequirement::from(sort_expr.clone()));
         match push_side {
             JoinSide::Left => {
                 required_input_ordering[0] = new_req;
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs 
b/datafusion/core/src/physical_optimizer/test_utils.rs
index bdf16300ea..88bb0b6fef 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -56,9 +56,7 @@ use datafusion_physical_plan::{
 
 use async_trait::async_trait;
 use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-use datafusion_physical_expr_common::sort_expr::{
-    LexOrdering, LexRequirement, PhysicalSortRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 
 async fn register_current_csv(
     ctx: &SessionContext,
@@ -419,9 +417,7 @@ impl ExecutionPlan for RequirementsTestExec {
     }
 
     fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
-        let requirement = PhysicalSortRequirement::from_sort_exprs(
-            self.required_input_ordering.as_ref().iter(),
-        );
+        let requirement = 
LexRequirement::from(self.required_input_ordering.clone());
         vec![Some(requirement)]
     }
 
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs 
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
index c9363b00e1..d563d0c56d 100644
--- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -27,7 +27,7 @@ use 
datafusion_physical_expr::aggregate::AggregateFunctionExpr;
 use datafusion_physical_expr::{
     reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
 };
-use datafusion_physical_expr_common::sort_expr::LexRequirement;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::aggregates::concat_slices;
 use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
@@ -139,12 +139,10 @@ fn try_convert_aggregate_if_better(
     aggr_exprs
         .into_iter()
         .map(|aggr_expr| {
-            let aggr_sort_exprs = 
&aggr_expr.order_bys().cloned().unwrap_or_default();
+            let aggr_sort_exprs = 
aggr_expr.order_bys().unwrap_or(LexOrdering::empty());
             let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
-            let aggr_sort_reqs =
-                
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
-            let reverse_aggr_req =
-                
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner);
+            let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone());
+            let reverse_aggr_req = 
LexRequirement::from(reverse_aggr_sort_exprs);
 
             // If the aggregate expression benefits from input ordering, and
             // there is an actual ordering enabling this, try to update the
diff --git a/datafusion/core/src/physical_optimizer/utils.rs 
b/datafusion/core/src/physical_optimizer/utils.rs
index 8007d8cc7f..9acd3f67c2 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec;
 use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
 use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
 
-use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
+use datafusion_physical_expr::LexRequirement;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion_physical_plan::tree_node::PlanContext;
 
@@ -38,7 +39,7 @@ pub fn add_sort_above<T: Clone + Default>(
     sort_requirements: LexRequirement,
     fetch: Option<usize>,
 ) -> PlanContext<T> {
-    let mut sort_expr = 
PhysicalSortRequirement::to_sort_exprs(sort_requirements);
+    let mut sort_expr = LexOrdering::from(sort_requirements);
     sort_expr.inner.retain(|sort_expr| {
         !node
             .plan
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs 
b/datafusion/physical-expr-common/src/sort_expr.rs
index fc11a6df56..9ae12fa9f6 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -296,26 +296,14 @@ impl PhysicalSortRequirement {
             })
     }
 
-    /// Returns [`PhysicalSortRequirement`] that requires the exact
-    /// sort of the [`PhysicalSortExpr`]s in `ordering`
-    ///
-    /// This method takes `&'a PhysicalSortExpr` to make it easy to
-    /// use implementing [`ExecutionPlan::required_input_ordering`].
-    ///
-    /// [`ExecutionPlan::required_input_ordering`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
+    #[deprecated(since = "43.0.0", note = "use  
LexRequirement::from_lex_ordering")]
     pub fn from_sort_exprs<'a>(
         ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
     ) -> LexRequirement {
         let ordering = ordering.into_iter().cloned().collect();
         LexRequirement::from_lex_ordering(ordering)
     }
-
-    /// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
-    /// of [`PhysicalSortExpr`]s.
-    ///
-    /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
-    /// for each entry in the input. If required ordering is None for an entry
-    /// default ordering `ASC, NULLS LAST` if given (see the 
`PhysicalSortExpr::from`).
+    #[deprecated(since = "43.0.0", note = "use  
LexOrdering::from_lex_requirement")]
     pub fn to_sort_exprs(
         requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
     ) -> LexOrdering {
@@ -416,8 +404,8 @@ impl LexOrdering {
     /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
     /// for each entry in the input. If required ordering is None for an entry
     /// default ordering `ASC, NULLS LAST` if given (see the 
`PhysicalSortExpr::from`).
-    pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering {
-        requirements
+    pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
+        requirement
             .into_iter()
             .map(PhysicalSortExpr::from)
             .collect()
@@ -541,15 +529,10 @@ impl LexRequirement {
         self.inner.push(physical_sort_requirement)
     }
 
-    /// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s.
+    /// Create a new [`LexRequirement`] from a [`LexOrdering`]
     ///
-    /// Returns [`PhysicalSortRequirement`] that requires the exact
+    /// Returns [`LexRequirement`] that requires the exact
     /// sort of the [`PhysicalSortExpr`]s in `ordering`
-    ///
-    /// This method takes `&'a PhysicalSortExpr` to make it easy to
-    /// use implementing [`ExecutionPlan::required_input_ordering`].
-    ///
-    /// [`ExecutionPlan::required_input_ordering`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
     pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
         Self::new(
             ordering
diff --git a/datafusion/physical-expr/src/equivalence/class.rs 
b/datafusion/physical-expr/src/equivalence/class.rs
index fbd3f3573e..e4185ad44d 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -475,11 +475,11 @@ impl EquivalenceGroup {
     /// sort expressions.
     pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> 
LexOrdering {
         // Convert sort expressions to sort requirements:
-        let sort_reqs = 
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+        let sort_reqs = LexRequirement::from(sort_exprs.clone());
         // Normalize the requirements:
         let normalized_sort_reqs = 
self.normalize_sort_requirements(&sort_reqs);
         // Convert sort requirements back to sort expressions:
-        PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
+        LexOrdering::from(normalized_sort_reqs)
     }
 
     /// This function applies the `normalize_sort_requirement` function for all
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 061e77222f..06f1e24ed2 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -409,11 +409,11 @@ impl EquivalenceProperties {
     /// after deduplication.
     fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
         // Convert sort expressions to sort requirements:
-        let sort_reqs = 
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+        let sort_reqs = LexRequirement::from(sort_exprs.clone());
         // Normalize the requirements:
         let normalized_sort_reqs = 
self.normalize_sort_requirements(&sort_reqs);
         // Convert sort requirements back to sort expressions:
-        PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
+        LexOrdering::from(normalized_sort_reqs)
     }
 
     /// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
@@ -454,7 +454,7 @@ impl EquivalenceProperties {
     /// orderings.
     pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
         // Convert the given sort expressions to sort requirements:
-        let sort_requirements = 
PhysicalSortRequirement::from_sort_exprs(given.iter());
+        let sort_requirements = LexRequirement::from(given.clone());
         self.ordering_satisfy_requirement(&sort_requirements)
     }
 
@@ -548,11 +548,11 @@ impl EquivalenceProperties {
         rhs: &LexOrdering,
     ) -> Option<LexOrdering> {
         // Convert the given sort expressions to sort requirements:
-        let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
-        let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
+        let lhs = LexRequirement::from(lhs.clone());
+        let rhs = LexRequirement::from(rhs.clone());
         let finer = self.get_finer_requirement(&lhs, &rhs);
         // Convert the chosen sort requirements back to sort expressions:
-        finer.map(PhysicalSortRequirement::to_sort_exprs)
+        finer.map(LexOrdering::from)
     }
 
     /// Returns the finer ordering among the requirements `lhs` and `rhs`,
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs 
b/datafusion/physical-optimizer/src/output_requirements.rs
index 4f6f91a234..6c8e76bff8 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -33,7 +33,7 @@ use datafusion_physical_plan::{
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::{Result, Statistics};
-use datafusion_physical_expr::{Distribution, LexRequirement, 
PhysicalSortRequirement};
+use datafusion_physical_expr::{Distribution, LexRequirement};
 use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
 use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
 
@@ -256,13 +256,13 @@ fn require_top_ordering_helper(
         // Therefore; we check the sort expression field of the SortExec to 
assign the requirements.
         let req_ordering = sort_exec.expr();
         let req_dist = sort_exec.required_input_distribution()[0].clone();
-        let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
+        let reqs = LexRequirement::from(req_ordering.clone());
         Ok((
             Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist)) 
as _,
             true,
         ))
     } else if let Some(spm) = 
plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
-        let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
+        let reqs = LexRequirement::from(spm.expr().clone());
         Ok((
             Arc::new(OutputRequirementExec::new(
                 plan,
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index a71039b573..2220007fdd 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1074,9 +1074,7 @@ pub fn get_finer_aggregate_exprs_requirement(
         );
     }
 
-    Ok(PhysicalSortRequirement::from_sort_exprs(
-        requirement.inner.iter(),
-    ))
+    Ok(LexRequirement::from(requirement))
 }
 
 /// Returns physical expressions for arguments to evaluate against a batch.
@@ -2304,7 +2302,7 @@ mod tests {
             &eq_properties,
             &AggregateMode::Partial,
         )?;
-        let res = PhysicalSortRequirement::to_sort_exprs(res);
+        let res = LexOrdering::from(res);
         assert_eq!(res, common_requirement);
         Ok(())
     }
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 2f6dc5fa0b..1eb6ea6329 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -51,7 +51,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, 
MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::equivalence::join_equivalence_properties;
-use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
+use datafusion_physical_expr::PhysicalExprRef;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use futures::{Stream, StreamExt};
 
@@ -297,12 +297,8 @@ impl ExecutionPlan for SortMergeJoinExec {
 
     fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
         vec![
-            Some(PhysicalSortRequirement::from_sort_exprs(
-                self.left_sort_exprs.iter(),
-            )),
-            Some(PhysicalSortRequirement::from_sort_exprs(
-                self.right_sort_exprs.iter(),
-            )),
+            Some(LexRequirement::from(self.left_sort_exprs.clone())),
+            Some(LexRequirement::from(self.right_sort_exprs.clone())),
         ]
     }
 
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 84c9f03b07..94ef4d5bc3 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -70,7 +70,7 @@ use datafusion_execution::TaskContext;
 use datafusion_expr::interval_arithmetic::Interval;
 use datafusion_physical_expr::equivalence::join_equivalence_properties;
 use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
-use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
+use datafusion_physical_expr::PhysicalExprRef;
 
 use ahash::RandomState;
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
@@ -415,12 +415,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
         vec![
             self.left_sort_exprs
                 .as_ref()
-                .map(LexOrdering::iter)
-                .map(PhysicalSortRequirement::from_sort_exprs),
+                .cloned()
+                .map(LexRequirement::from),
             self.right_sort_exprs
                 .as_ref()
-                .map(LexOrdering::iter)
-                .map(PhysicalSortRequirement::from_sort_exprs),
+                .cloned()
+                .map(LexRequirement::from),
         ]
     }
 
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 5c80322afe..9a89db9a58 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -32,7 +32,6 @@ use crate::{
 use datafusion_common::{internal_err, Result};
 use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalSortRequirement;
 
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use log::{debug, trace};
@@ -203,9 +202,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
     }
 
     fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
-        vec![Some(PhysicalSortRequirement::from_sort_exprs(
-            self.expr.iter(),
-        ))]
+        vec![Some(LexRequirement::from(self.expr.clone()))]
     }
 
     fn maintains_input_order(&self) -> Vec<bool> {
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index da7f6d79e5..24aba011d0 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -516,9 +516,8 @@ pub fn get_window_mode(
     // Treat partition by exprs as constant. During analysis of requirements 
are satisfied.
     let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
     let partition_by_eqs = input_eqs.with_constants(const_exprs);
-    let order_by_reqs = 
PhysicalSortRequirement::from_sort_exprs(orderby_keys.iter());
-    let reverse_order_by_reqs =
-        
PhysicalSortRequirement::from_sort_exprs(reverse_order_bys(orderby_keys).iter());
+    let order_by_reqs = LexRequirement::from(orderby_keys.clone());
+    let reverse_order_by_reqs = 
LexRequirement::from(reverse_order_bys(orderby_keys));
     for (should_swap, order_by_reqs) in
         [(false, order_by_reqs), (true, reverse_order_by_reqs)]
     {
diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index e84eae2b90..64e462d169 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -35,7 +35,7 @@ use datafusion::datasource::physical_plan::{AvroExec, 
CsvExec};
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::execution::FunctionRegistry;
 use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
-use datafusion::physical_expr::{LexOrdering, PhysicalExprRef, 
PhysicalSortRequirement};
+use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
 use datafusion::physical_plan::aggregates::AggregateMode;
 use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
 use datafusion::physical_plan::analyze::AnalyzeExec;
@@ -1037,7 +1037,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             &sink_schema,
                             extension_codec,
                         )
-                        .map(|item| 
PhysicalSortRequirement::from_sort_exprs(&item.inner))
+                        .map(LexRequirement::from)
                     })
                     .transpose()?;
                 Ok(Arc::new(DataSinkExec::new(
@@ -1067,7 +1067,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                             &sink_schema,
                             extension_codec,
                         )
-                        .map(|item| 
PhysicalSortRequirement::from_sort_exprs(&item.inner))
+                        .map(LexRequirement::from)
                     })
                     .transpose()?;
                 Ok(Arc::new(DataSinkExec::new(
@@ -1104,9 +1104,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
                                 &sink_schema,
                                 extension_codec,
                             )
-                            .map(|item| {
-                                
PhysicalSortRequirement::from_sort_exprs(&item.inner)
-                            })
+                            .map(LexRequirement::from)
                         })
                         .transpose()?;
                     Ok(Arc::new(DataSinkExec::new(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to