This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9005585fa6 Deprecate `LexOrderingRef` and `LexRequirementRef` (#13233)
9005585fa6 is described below

commit 9005585fa6f4eb6a4d0cc515b6ad76794c33c626
Author: Jagdish Parihar <[email protected]>
AuthorDate: Tue Nov 5 20:21:36 2024 +0530

    Deprecate `LexOrderingRef` and `LexRequirementRef` (#13233)
    
    * converted LexOrderingRef to &LexOrdering
    
    * using  LexOrdering::from_ref fn  instead of directly cloning it
    
    * using as_ref instread of &
    
    * using as_ref
    
    * removed commented code
    
    * updated cargo lock
    
    * updated LexRequirementRef to &LexRequirement
    
    * fixed clippy issues
    
    * fixed taplo error for cargo.toml in physical-expr-common
    
    * removed commented code
    
    * fixed clippy errors
    
    * fixed clippy error
    
    * fixes
    
    * removed  LexOrdering::from_ref instead using clone and created 
LexOrdering::empty() fn
    
    * Update mod.rs
    
    ---------
    
    Co-authored-by: Berkay Şahin 
<[email protected]>
    Co-authored-by: berkaysynnada <[email protected]>
---
 benchmarks/src/sort.rs                             |  39 ++++----
 datafusion-cli/Cargo.lock                          |  33 +++----
 .../datasource/physical_plan/file_scan_config.rs   |  27 +++---
 .../src/datasource/physical_plan/statistics.rs     |  34 +++----
 .../src/physical_optimizer/enforce_distribution.rs |   8 +-
 .../core/src/physical_optimizer/enforce_sorting.rs |  17 ++--
 .../replace_with_order_preserving_variants.rs      |  15 ++-
 .../core/src/physical_optimizer/sort_pushdown.rs   |  72 ++++++++-------
 .../src/physical_optimizer/update_aggr_exprs.rs    |  19 ++--
 .../core/tests/fuzz_cases/equivalence/utils.rs     |   6 +-
 .../functions-aggregate-common/src/accumulator.rs  |   4 +-
 datafusion/functions-aggregate-common/src/utils.rs |   6 +-
 datafusion/functions-aggregate/benches/count.rs    |   4 +-
 datafusion/functions-aggregate/benches/sum.rs      |   4 +-
 datafusion/functions-aggregate/src/array_agg.rs    |   2 +-
 datafusion/functions-aggregate/src/first_last.rs   |  11 +--
 datafusion/functions-aggregate/src/nth_value.rs    |   2 +-
 datafusion/functions-aggregate/src/stddev.rs       |   6 +-
 datafusion/physical-expr-common/Cargo.toml         |   1 +
 datafusion/physical-expr-common/src/sort_expr.rs   | 102 +++++++++++++++++----
 datafusion/physical-expr-common/src/utils.rs       |   4 +-
 datafusion/physical-expr/src/aggregate.rs          |   9 +-
 datafusion/physical-expr/src/equivalence/class.rs  |   9 +-
 .../physical-expr/src/equivalence/ordering.rs      |   7 +-
 .../physical-expr/src/equivalence/properties.rs    |  34 +++----
 datafusion/physical-expr/src/lib.rs                |   3 +-
 datafusion/physical-expr/src/utils/mod.rs          |   4 +-
 datafusion/physical-expr/src/window/aggregate.rs   |   8 +-
 datafusion/physical-expr/src/window/built_in.rs    |   8 +-
 .../physical-expr/src/window/sliding_aggregate.rs  |   8 +-
 datafusion/physical-expr/src/window/window_expr.rs |   6 +-
 datafusion/physical-plan/src/aggregates/mod.rs     |   4 +-
 .../physical-plan/src/aggregates/order/mod.rs      |   4 +-
 .../physical-plan/src/aggregates/order/partial.rs  |   4 +-
 datafusion/physical-plan/src/execution_plan.rs     |  12 +--
 .../physical-plan/src/joins/stream_join_utils.rs   |   6 +-
 .../physical-plan/src/joins/symmetric_hash_join.rs |  12 +--
 datafusion/physical-plan/src/joins/utils.rs        |  16 ++--
 datafusion/physical-plan/src/repartition/mod.rs    |  10 +-
 datafusion/physical-plan/src/sorts/partial_sort.rs |   3 +-
 datafusion/physical-plan/src/sorts/sort.rs         |  36 ++++----
 .../src/sorts/sort_preserving_merge.rs             |   8 +-
 datafusion/physical-plan/src/sorts/stream.rs       |   4 +-
 .../physical-plan/src/sorts/streaming_merge.rs     |  22 ++++-
 .../src/windows/bounded_window_agg_exec.rs         |   7 +-
 datafusion/physical-plan/src/windows/mod.rs        |  10 +-
 datafusion/proto/src/physical_plan/to_proto.rs     |   2 +-
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  27 +++---
 48 files changed, 396 insertions(+), 303 deletions(-)

diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs
index b2038c432f..f4b707611c 100644
--- a/benchmarks/src/sort.rs
+++ b/benchmarks/src/sort.rs
@@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};
 
 use arrow::util::pretty;
 use datafusion::common::Result;
-use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
+use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
 use datafusion::physical_plan::collect;
 use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::prelude::{SessionConfig, SessionContext};
@@ -70,31 +70,28 @@ impl RunOpt {
         let sort_cases = vec![
             (
                 "sort utf8",
-                vec![PhysicalSortExpr {
+                LexOrdering::new(vec![PhysicalSortExpr {
                     expr: col("request_method", &schema)?,
                     options: Default::default(),
-                }],
+                }]),
             ),
             (
                 "sort int",
-                vec![PhysicalSortExpr {
-                    expr: col("request_bytes", &schema)?,
+                LexOrdering::new(vec![PhysicalSortExpr {
+                    expr: col("response_bytes", &schema)?,
                     options: Default::default(),
-                }],
+                }]),
             ),
             (
                 "sort decimal",
-                vec![
-                    // sort decimal
-                    PhysicalSortExpr {
-                        expr: col("decimal_price", &schema)?,
-                        options: Default::default(),
-                    },
-                ],
+                LexOrdering::new(vec![PhysicalSortExpr {
+                    expr: col("decimal_price", &schema)?,
+                    options: Default::default(),
+                }]),
             ),
             (
                 "sort integer tuple",
-                vec![
+                LexOrdering::new(vec![
                     PhysicalSortExpr {
                         expr: col("request_bytes", &schema)?,
                         options: Default::default(),
@@ -103,11 +100,11 @@ impl RunOpt {
                         expr: col("response_bytes", &schema)?,
                         options: Default::default(),
                     },
-                ],
+                ]),
             ),
             (
                 "sort utf8 tuple",
-                vec![
+                LexOrdering::new(vec![
                     // sort utf8 tuple
                     PhysicalSortExpr {
                         expr: col("service", &schema)?,
@@ -125,11 +122,11 @@ impl RunOpt {
                         expr: col("image", &schema)?,
                         options: Default::default(),
                     },
-                ],
+                ]),
             ),
             (
                 "sort mixed tuple",
-                vec![
+                LexOrdering::new(vec![
                     PhysicalSortExpr {
                         expr: col("service", &schema)?,
                         options: Default::default(),
@@ -142,7 +139,7 @@ impl RunOpt {
                         expr: col("decimal_price", &schema)?,
                         options: Default::default(),
                     },
-                ],
+                ]),
             ),
         ];
         for (title, expr) in sort_cases {
@@ -170,13 +167,13 @@ impl RunOpt {
 
 async fn exec_sort(
     ctx: &SessionContext,
-    expr: LexOrderingRef<'_>,
+    expr: &LexOrdering,
     test_file: &TestParquetFile,
     debug: bool,
 ) -> Result<(usize, std::time::Duration)> {
     let start = Instant::now();
     let scan = test_file.create_scan(ctx, None).await?;
-    let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()), 
scan));
+    let exec = Arc::new(SortExec::new(expr.clone(), scan));
     let task_ctx = ctx.task_ctx();
     let result = collect(exec, task_ctx).await?;
     let elapsed = start.elapsed();
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 541d464d38..3348adb103 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -99,9 +99,9 @@ dependencies = [
 
 [[package]]
 name = "anstyle"
-version = "1.0.9"
+version = "1.0.10"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "8365de52b16c035ff4fcafe0092ba9390540e3e352870ac09933bebcaa2c8c56"
+checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
 
 [[package]]
 name = "anstyle-parse"
@@ -523,9 +523,9 @@ dependencies = [
 
 [[package]]
 name = "aws-sdk-sso"
-version = "1.47.0"
+version = "1.48.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a8776850becacbd3a82a4737a9375ddb5c6832a51379f24443a98e61513f852c"
+checksum = "ded855583fa1d22e88fe39fd6062b062376e50a8211989e07cf5e38d52eb3453"
 dependencies = [
  "aws-credential-types",
  "aws-runtime",
@@ -545,9 +545,9 @@ dependencies = [
 
 [[package]]
 name = "aws-sdk-ssooidc"
-version = "1.48.0"
+version = "1.49.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "0007b5b8004547133319b6c4e87193eee2a0bcb3e4c18c75d09febe9dab7b383"
+checksum = "9177ea1192e6601ae16c7273385690d88a7ed386a00b74a6bc894d12103cd933"
 dependencies = [
  "aws-credential-types",
  "aws-runtime",
@@ -567,9 +567,9 @@ dependencies = [
 
 [[package]]
 name = "aws-sdk-sts"
-version = "1.47.0"
+version = "1.48.0"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "9fffaa356e7f1c725908b75136d53207fa714e348f365671df14e95a60530ad3"
+checksum = "823ef553cf36713c97453e2ddff1eb8f62be7f4523544e2a5db64caf80100f0a"
 dependencies = [
  "aws-credential-types",
  "aws-runtime",
@@ -917,9 +917,9 @@ dependencies = [
 
 [[package]]
 name = "cc"
-version = "1.1.31"
+version = "1.1.34"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f"
+checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9"
 dependencies = [
  "jobserver",
  "libc",
@@ -1520,6 +1520,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr-common",
  "hashbrown 0.14.5",
+ "itertools",
  "rand",
 ]
 
@@ -3614,9 +3615,9 @@ checksum = 
"13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
 
 [[package]]
 name = "syn"
-version = "2.0.85"
+version = "2.0.87"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56"
+checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -3653,18 +3654,18 @@ checksum = 
"3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
 
 [[package]]
 name = "thiserror"
-version = "1.0.65"
+version = "1.0.66"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5"
+checksum = "5d171f59dbaa811dbbb1aee1e73db92ec2b122911a48e1390dfe327a821ddede"
 dependencies = [
  "thiserror-impl",
 ]
 
 [[package]]
 name = "thiserror-impl"
-version = "1.0.65"
+version = "1.0.66"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602"
+checksum = "b08be0f17bd307950653ce45db00cd31200d82b624b36e181337d9c7d92765b5"
 dependencies = [
  "proc-macro2",
  "quote",
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs 
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 74ab0126a5..6a162c97b6 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -35,7 +35,6 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::stats::Precision;
 use datafusion_common::{exec_err, ColumnStatistics, DataFusionError, 
Statistics};
 use datafusion_physical_expr::LexOrdering;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
 
 use log::warn;
 
@@ -308,7 +307,7 @@ impl FileScanConfig {
     pub fn split_groups_by_statistics(
         table_schema: &SchemaRef,
         file_groups: &[Vec<PartitionedFile>],
-        sort_order: LexOrderingRef,
+        sort_order: &LexOrdering,
     ) -> Result<Vec<Vec<PartitionedFile>>> {
         let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
         // First Fit:
@@ -1113,17 +1112,19 @@ mod tests {
                     ))))
                     .collect::<Vec<_>>(),
             ));
-            let sort_order = case
-                .sort
-                .into_iter()
-                .map(|expr| {
-                    crate::physical_planner::create_physical_sort_expr(
-                        &expr,
-                        &DFSchema::try_from(table_schema.as_ref().clone())?,
-                        &ExecutionProps::default(),
-                    )
-                })
-                .collect::<Result<Vec<_>>>()?;
+            let sort_order = LexOrdering {
+                inner: case
+                    .sort
+                    .into_iter()
+                    .map(|expr| {
+                        crate::physical_planner::create_physical_sort_expr(
+                            &expr,
+                            
&DFSchema::try_from(table_schema.as_ref().clone())?,
+                            &ExecutionProps::default(),
+                        )
+                    })
+                    .collect::<Result<Vec<_>>>()?,
+            };
 
             let partitioned_files =
                 case.files.into_iter().map(From::from).collect::<Vec<_>>();
diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs 
b/datafusion/core/src/datasource/physical_plan/statistics.rs
index 6af153a731..488098e786 100644
--- a/datafusion/core/src/datasource/physical_plan/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/statistics.rs
@@ -36,7 +36,7 @@ use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 /// A normalized representation of file min/max statistics that allows for 
efficient sorting & comparison.
 /// The min/max values are ordered by [`Self::sort_order`].
@@ -50,7 +50,7 @@ pub(crate) struct MinMaxStatistics {
 impl MinMaxStatistics {
     /// Sort order used to sort the statistics
     #[allow(unused)]
-    pub fn sort_order(&self) -> LexOrderingRef {
+    pub fn sort_order(&self) -> &LexOrdering {
         &self.sort_order
     }
 
@@ -66,8 +66,8 @@ impl MinMaxStatistics {
     }
 
     pub fn new_from_files<'a>(
-        projected_sort_order: LexOrderingRef, // Sort order with respect to 
projected schema
-        projected_schema: &SchemaRef,         // Projected schema
+        projected_sort_order: &LexOrdering, // Sort order with respect to 
projected schema
+        projected_schema: &SchemaRef,       // Projected schema
         projection: Option<&[usize]>, // Indices of projection in full table 
schema (None = all columns)
         files: impl IntoIterator<Item = &'a PartitionedFile>,
     ) -> Result<Self> {
@@ -119,15 +119,17 @@ impl MinMaxStatistics {
             projected_schema
                 .project(&(sort_columns.iter().map(|c| 
c.index()).collect::<Vec<_>>()))?,
         );
-        let min_max_sort_order = sort_columns
-            .iter()
-            .zip(projected_sort_order.iter())
-            .enumerate()
-            .map(|(i, (col, sort))| PhysicalSortExpr {
-                expr: Arc::new(Column::new(col.name(), i)),
-                options: sort.options,
-            })
-            .collect::<Vec<_>>();
+        let min_max_sort_order = LexOrdering {
+            inner: sort_columns
+                .iter()
+                .zip(projected_sort_order.iter())
+                .enumerate()
+                .map(|(i, (col, sort))| PhysicalSortExpr {
+                    expr: Arc::new(Column::new(col.name(), i)),
+                    options: sort.options,
+                })
+                .collect::<Vec<_>>(),
+        };
 
         let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
             .iter()
@@ -167,7 +169,7 @@ impl MinMaxStatistics {
     }
 
     pub fn new(
-        sort_order: LexOrderingRef,
+        sort_order: &LexOrdering,
         schema: &SchemaRef,
         min_values: RecordBatch,
         max_values: RecordBatch,
@@ -257,7 +259,7 @@ impl MinMaxStatistics {
         Ok(Self {
             min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
             max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
-            sort_order: LexOrdering::from_ref(sort_order),
+            sort_order: sort_order.clone(),
         })
     }
 
@@ -278,7 +280,7 @@ impl MinMaxStatistics {
 }
 
 fn sort_columns_from_physical_sort_exprs(
-    sort_order: LexOrderingRef,
+    sort_order: &LexOrdering,
 ) -> Option<Vec<&Column>> {
     sort_order
         .iter()
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 6cd902db72..6863978610 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -52,12 +52,12 @@ use 
datafusion_physical_expr::utils::map_columns_before_projection;
 use datafusion_physical_expr::{
     physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
 };
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::windows::{get_best_fitting_window, 
BoundedWindowAggExec};
 use datafusion_physical_plan::ExecutionPlanProperties;
 
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use itertools::izip;
 
 /// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -936,7 +936,11 @@ fn add_spm_on_top(input: DistributionContext) -> 
DistributionContext {
 
         let new_plan = if should_preserve_ordering {
             Arc::new(SortPreservingMergeExec::new(
-                
LexOrdering::from_ref(input.plan.output_ordering().unwrap_or(&[])),
+                input
+                    .plan
+                    .output_ordering()
+                    .unwrap_or(&LexOrdering::default())
+                    .clone(),
                 input.plan.clone(),
             )) as _
         } else {
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs 
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 7b111cddc6..adc3d7cac1 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -62,7 +62,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, 
InputOrderMode};
 use datafusion_common::plan_err;
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
 use datafusion_physical_plan::repartition::RepartitionExec;
@@ -224,9 +224,9 @@ fn replace_with_partial_sort(
         let sort_req = 
PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
 
         let mut common_prefix_length = 0;
-        while child_eq_properties
-            .ordering_satisfy_requirement(&sort_req[0..common_prefix_length + 
1])
-        {
+        while child_eq_properties.ordering_satisfy_requirement(&LexRequirement 
{
+            inner: sort_req[0..common_prefix_length + 1].to_vec(),
+        }) {
             common_prefix_length += 1;
         }
         if common_prefix_length > 0 {
@@ -392,7 +392,10 @@ fn analyze_immediate_sort_removal(
         let sort_input = sort_exec.input();
         // If this sort is unnecessary, we should remove it:
         if sort_input.equivalence_properties().ordering_satisfy(
-            sort_exec.properties().output_ordering().unwrap_or_default(),
+            sort_exec
+                .properties()
+                .output_ordering()
+                .unwrap_or(LexOrdering::empty()),
         ) {
             node.plan = if !sort_exec.preserve_partitioning()
                 && sort_input.output_partitioning().partition_count() > 1
@@ -632,10 +635,10 @@ fn remove_corresponding_sort_from_sub_plan(
     Ok(node)
 }
 
-/// Converts an [ExecutionPlan] trait object to a [LexOrderingRef] when 
possible.
+/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when 
possible.
 fn get_sort_exprs(
     sort_any: &Arc<dyn ExecutionPlan>,
-) -> Result<(LexOrderingRef, Option<usize>)> {
+) -> Result<(&LexOrdering, Option<usize>)> {
     if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
         Ok((sort_exec.expr(), sort_exec.fetch()))
     } else if let Some(spm) = 
sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 930ce52e6f..c80aea411f 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -129,11 +129,13 @@ fn plan_with_order_preserving_variants(
         return Ok(sort_input);
     } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
         let child = &sort_input.children[0].plan;
-        if let Some(ordering) = child.output_ordering().map(Vec::from) {
+        if let Some(ordering) = child.output_ordering() {
             // When the input of a `CoalescePartitionsExec` has an ordering,
             // replace it with a `SortPreservingMergeExec` if appropriate:
-            let spm =
-                SortPreservingMergeExec::new(LexOrdering::new(ordering), 
child.clone());
+            let spm = SortPreservingMergeExec::new(
+                LexOrdering::new(ordering.inner.clone()),
+                child.clone(),
+            );
             sort_input.plan = Arc::new(spm) as _;
             sort_input.children[0].data = true;
             return Ok(sort_input);
@@ -257,7 +259,12 @@ pub(crate) fn replace_with_order_preserving_variants(
     if alternate_plan
         .plan
         .equivalence_properties()
-        
.ordering_satisfy(requirements.plan.output_ordering().unwrap_or_default())
+        .ordering_satisfy(
+            requirements
+                .plan
+                .output_ordering()
+                .unwrap_or(LexOrdering::empty()),
+        )
     {
         for child in alternate_plan.children.iter_mut() {
             child.data = false;
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs 
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 1a53077b1f..e231e719b2 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -36,10 +36,8 @@ use datafusion_common::{plan_err, HashSet, JoinSide, Result};
 use datafusion_expr::JoinType;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::collect_columns;
-use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
-use datafusion_physical_expr_common::sort_expr::{
-    LexOrdering, LexOrderingRef, LexRequirement,
-};
+use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 
 /// This is a "data class" we use within the [`EnforceSorting`] rule to push
 /// down [`SortExec`] in the plan. In some cases, we can reduce the total
@@ -87,11 +85,12 @@ fn pushdown_sorts_helper(
     let parent_reqs = requirements
         .data
         .ordering_requirement
-        .as_deref()
-        .unwrap_or(&[]);
+        .clone()
+        .unwrap_or_default();
     let satisfy_parent = plan
         .equivalence_properties()
-        .ordering_satisfy_requirement(parent_reqs);
+        .ordering_satisfy_requirement(&parent_reqs);
+
     if is_sort(plan) {
         let required_ordering = plan
             .output_ordering()
@@ -139,7 +138,7 @@ fn pushdown_sorts_helper(
         for (child, order) in requirements.children.iter_mut().zip(reqs) {
             child.data.ordering_requirement = order;
         }
-    } else if let Some(adjusted) = pushdown_requirement_to_children(plan, 
parent_reqs)? {
+    } else if let Some(adjusted) = pushdown_requirement_to_children(plan, 
&parent_reqs)? {
         // Can not satisfy the parent requirements, check whether we can push
         // requirements down:
         for (child, order) in requirements.children.iter_mut().zip(adjusted) {
@@ -162,14 +161,16 @@ fn pushdown_sorts_helper(
 
 fn pushdown_requirement_to_children(
     plan: &Arc<dyn ExecutionPlan>,
-    parent_required: LexRequirementRef,
+    parent_required: &LexRequirement,
 ) -> Result<Option<Vec<Option<LexRequirement>>>> {
     let maintains_input_order = plan.maintains_input_order();
     if is_window(plan) {
         let required_input_ordering = plan.required_input_ordering();
-        let request_child = 
required_input_ordering[0].as_deref().unwrap_or(&[]);
+        let request_child = 
required_input_ordering[0].clone().unwrap_or_default();
         let child_plan = plan.children().swap_remove(0);
-        match determine_children_requirement(parent_required, request_child, 
child_plan) {
+
+        match determine_children_requirement(parent_required, &request_child, 
child_plan)
+        {
             RequirementsCompatibility::Satisfy => {
                 let req = (!request_child.is_empty())
                     .then(|| LexRequirement::new(request_child.to_vec()));
@@ -180,7 +181,10 @@ fn pushdown_requirement_to_children(
         }
     } else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
         let sort_req = PhysicalSortRequirement::from_sort_exprs(
-            sort_exec.properties().output_ordering().unwrap_or(&[]),
+            sort_exec
+                .properties()
+                .output_ordering()
+                .unwrap_or(&LexOrdering::default()),
         );
         if sort_exec
             .properties()
@@ -202,7 +206,9 @@ fn pushdown_requirement_to_children(
             .all(|maintain| *maintain)
     {
         let output_req = PhysicalSortRequirement::from_sort_exprs(
-            plan.properties().output_ordering().unwrap_or(&[]),
+            plan.properties()
+                .output_ordering()
+                .unwrap_or(&LexOrdering::default()),
         );
         // Push down through operator with fetch when:
         // - requirement is aligned with output ordering
@@ -229,7 +235,11 @@ fn pushdown_requirement_to_children(
         let left_columns_len = smj.left().schema().fields().len();
         let parent_required_expr =
             
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
-        match expr_source_side(&parent_required_expr, smj.join_type(), 
left_columns_len) {
+        match expr_source_side(
+            parent_required_expr.as_ref(),
+            smj.join_type(),
+            left_columns_len,
+        ) {
             Some(JoinSide::Left) => try_pushdown_requirements_to_join(
                 smj,
                 parent_required,
@@ -275,7 +285,8 @@ fn pushdown_requirement_to_children(
         spm_eqs = spm_eqs.with_reorder(new_ordering);
         // Do not push-down through SortPreservingMergeExec when
         // ordering requirement invalidates requirement of sort preserving 
merge exec.
-        if 
!spm_eqs.ordering_satisfy(plan.output_ordering().unwrap_or_default()) {
+        if 
!spm_eqs.ordering_satisfy(&plan.output_ordering().cloned().unwrap_or_default())
+        {
             Ok(None)
         } else {
             // Can push-down through SortPreservingMergeExec, because parent 
requirement is finer
@@ -293,7 +304,7 @@ fn pushdown_requirement_to_children(
 /// Return true if pushing the sort requirements through a node would violate
 /// the input sorting requirements for the plan
 fn pushdown_would_violate_requirements(
-    parent_required: LexRequirementRef,
+    parent_required: &LexRequirement,
     child: &dyn ExecutionPlan,
 ) -> bool {
     child
@@ -319,8 +330,8 @@ fn pushdown_would_violate_requirements(
 /// - If parent requirements are more specific, push down parent requirements.
 /// - If they are not compatible, need to add a sort.
 fn determine_children_requirement(
-    parent_required: LexRequirementRef,
-    request_child: LexRequirementRef,
+    parent_required: &LexRequirement,
+    request_child: &LexRequirement,
     child_plan: &Arc<dyn ExecutionPlan>,
 ) -> RequirementsCompatibility {
     if child_plan
@@ -345,8 +356,8 @@ fn determine_children_requirement(
 
 fn try_pushdown_requirements_to_join(
     smj: &SortMergeJoinExec,
-    parent_required: LexRequirementRef,
-    sort_expr: LexOrderingRef,
+    parent_required: &LexRequirement,
+    sort_expr: &LexOrdering,
     push_side: JoinSide,
 ) -> Result<Option<Vec<Option<LexRequirement>>>> {
     let left_eq_properties = smj.left().equivalence_properties();
@@ -354,13 +365,13 @@ fn try_pushdown_requirements_to_join(
     let mut smj_required_orderings = smj.required_input_ordering();
     let right_requirement = smj_required_orderings.swap_remove(1);
     let left_requirement = smj_required_orderings.swap_remove(0);
-    let left_ordering = smj.left().output_ordering().unwrap_or_default();
-    let right_ordering = smj.right().output_ordering().unwrap_or_default();
+    let left_ordering = 
&smj.left().output_ordering().cloned().unwrap_or_default();
+    let right_ordering = 
&smj.right().output_ordering().cloned().unwrap_or_default();
+
     let (new_left_ordering, new_right_ordering) = match push_side {
         JoinSide::Left => {
-            let left_eq_properties = left_eq_properties
-                .clone()
-                .with_reorder(LexOrdering::from_ref(sort_expr));
+            let left_eq_properties =
+                left_eq_properties.clone().with_reorder(sort_expr.clone());
             if left_eq_properties
                 
.ordering_satisfy_requirement(&left_requirement.unwrap_or_default())
             {
@@ -371,9 +382,8 @@ fn try_pushdown_requirements_to_join(
             }
         }
         JoinSide::Right => {
-            let right_eq_properties = right_eq_properties
-                .clone()
-                .with_reorder(LexOrdering::from_ref(sort_expr));
+            let right_eq_properties =
+                right_eq_properties.clone().with_reorder(sort_expr.clone());
             if right_eq_properties
                 
.ordering_satisfy_requirement(&right_requirement.unwrap_or_default())
             {
@@ -417,7 +427,7 @@ fn try_pushdown_requirements_to_join(
 }
 
 fn expr_source_side(
-    required_exprs: LexOrderingRef,
+    required_exprs: &LexOrdering,
     join_type: JoinType,
     left_columns_len: usize,
 ) -> Option<JoinSide> {
@@ -469,7 +479,7 @@ fn expr_source_side(
 }
 
 fn shift_right_required(
-    parent_required: LexRequirementRef,
+    parent_required: &LexRequirement,
     left_columns_len: usize,
 ) -> Result<LexRequirement> {
     let new_right_required = parent_required
@@ -507,7 +517,7 @@ fn shift_right_required(
 /// pushed down, `Ok(None)` if not. On error, returns a `Result::Err`.
 fn handle_custom_pushdown(
     plan: &Arc<dyn ExecutionPlan>,
-    parent_required: LexRequirementRef,
+    parent_required: &LexRequirement,
     maintains_input_order: Vec<bool>,
 ) -> Result<Option<Vec<Option<LexRequirement>>>> {
     // If there's no requirement from the parent or the plan has no children, 
return early
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs 
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
index d85278556c..c9363b00e1 100644
--- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -27,6 +27,7 @@ use 
datafusion_physical_expr::aggregate::AggregateFunctionExpr;
 use datafusion_physical_expr::{
     reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
 };
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::aggregates::concat_slices;
 use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
@@ -138,7 +139,7 @@ fn try_convert_aggregate_if_better(
     aggr_exprs
         .into_iter()
         .map(|aggr_expr| {
-            let aggr_sort_exprs = &aggr_expr.order_bys().unwrap_or_default();
+            let aggr_sort_exprs = 
&aggr_expr.order_bys().cloned().unwrap_or_default();
             let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
             let aggr_sort_reqs =
                 
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
@@ -151,14 +152,20 @@ fn try_convert_aggregate_if_better(
             // Otherwise, leave it as is.
             if aggr_expr.order_sensitivity().is_beneficial() && 
!aggr_sort_reqs.is_empty()
             {
-                let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs);
+                let reqs = LexRequirement {
+                    inner: concat_slices(prefix_requirement, &aggr_sort_reqs),
+                };
+
+                let prefix_requirement = LexRequirement {
+                    inner: prefix_requirement.to_vec(),
+                };
+
                 if eq_properties.ordering_satisfy_requirement(&reqs) {
                     // Existing ordering satisfies the aggregator requirements:
                     aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
-                } else if 
eq_properties.ordering_satisfy_requirement(&concat_slices(
-                    prefix_requirement,
-                    &reverse_aggr_req,
-                )) {
+                } else if 
eq_properties.ordering_satisfy_requirement(&LexRequirement {
+                    inner: concat_slices(&prefix_requirement, 
&reverse_aggr_req),
+                }) {
                     // Converting to reverse enables more efficient execution
                     // given the existing ordering (if possible):
                     aggr_expr
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs 
b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
index 35da8b5963..921332bca5 100644
--- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
@@ -32,7 +32,7 @@ use datafusion_expr::sort_properties::{ExprProperties, 
SortProperties};
 use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
 use datafusion_physical_expr::equivalence::{EquivalenceClass, 
ProjectionMapping};
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 use itertools::izip;
 use rand::prelude::*;
@@ -465,7 +465,7 @@ pub fn generate_table_for_orderings(
 
     // prune out rows that is invalid according to remaining orderings.
     for ordering in orderings.iter().skip(1) {
-        let sort_columns = get_sort_columns(&batch, ordering.as_ref())?;
+        let sort_columns = get_sort_columns(&batch, ordering)?;
 
         // Collect sort options and values into separate vectors.
         let (sort_options, sort_col_values): (Vec<_>, Vec<_>) = sort_columns
@@ -530,7 +530,7 @@ fn generate_random_f64_array(
 // Helper function to get sort columns from a batch
 fn get_sort_columns(
     batch: &RecordBatch,
-    ordering: LexOrderingRef,
+    ordering: &LexOrdering,
 ) -> Result<Vec<SortColumn>> {
     ordering
         .iter()
diff --git a/datafusion/functions-aggregate-common/src/accumulator.rs 
b/datafusion/functions-aggregate-common/src/accumulator.rs
index 67ada56280..a230bb0289 100644
--- a/datafusion/functions-aggregate-common/src/accumulator.rs
+++ b/datafusion/functions-aggregate-common/src/accumulator.rs
@@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema};
 use datafusion_common::Result;
 use datafusion_expr_common::accumulator::Accumulator;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use std::sync::Arc;
 
 /// [`AccumulatorArgs`] contains information about how an aggregate
@@ -52,7 +52,7 @@ pub struct AccumulatorArgs<'a> {
     /// ```
     ///
     /// If no `ORDER BY` is specified, `ordering_req` will be empty.
-    pub ordering_req: LexOrderingRef<'a>,
+    pub ordering_req: &'a LexOrdering,
 
     /// Whether the aggregation is running in reverse order
     pub is_reversed: bool,
diff --git a/datafusion/functions-aggregate-common/src/utils.rs 
b/datafusion/functions-aggregate-common/src/utils.rs
index f55e5ec9a4..e440abe2de 100644
--- a/datafusion/functions-aggregate-common/src/utils.rs
+++ b/datafusion/functions-aggregate-common/src/utils.rs
@@ -30,7 +30,7 @@ use arrow::{
 };
 use datafusion_common::{exec_err, DataFusionError, Result};
 use datafusion_expr_common::accumulator::Accumulator;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 /// Convert scalar values from an accumulator into arrays.
 pub fn get_accum_scalar_values_as_arrays(
@@ -88,7 +88,7 @@ pub fn adjust_output_array(data_type: &DataType, array: 
ArrayRef) -> Result<Arra
 
 /// Construct corresponding fields for lexicographical ordering requirement 
expression
 pub fn ordering_fields(
-    ordering_req: LexOrderingRef,
+    ordering_req: &LexOrdering,
     // Data type of each expression in the ordering requirement
     data_types: &[DataType],
 ) -> Vec<Field> {
@@ -107,7 +107,7 @@ pub fn ordering_fields(
 }
 
 /// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
-pub fn get_sort_options(ordering_req: LexOrderingRef) -> Vec<SortOptions> {
+pub fn get_sort_options(ordering_req: &LexOrdering) -> Vec<SortOptions> {
     ordering_req.iter().map(|item| item.options).collect()
 }
 
diff --git a/datafusion/functions-aggregate/benches/count.rs 
b/datafusion/functions-aggregate/benches/count.rs
index 1c8266ed5b..e6b62e6e18 100644
--- a/datafusion/functions-aggregate/benches/count.rs
+++ b/datafusion/functions-aggregate/benches/count.rs
@@ -23,7 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, 
Criterion};
 use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, 
GroupsAccumulator};
 use datafusion_functions_aggregate::count::Count;
 use datafusion_physical_expr::expressions::col;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use std::sync::Arc;
 
 fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
@@ -32,7 +32,7 @@ fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
         return_type: &DataType::Int64,
         schema: &schema,
         ignore_nulls: false,
-        ordering_req: LexOrderingRef::default(),
+        ordering_req: &LexOrdering::default(),
         is_reversed: false,
         name: "COUNT(f)",
         is_distinct: false,
diff --git a/datafusion/functions-aggregate/benches/sum.rs 
b/datafusion/functions-aggregate/benches/sum.rs
index 1e9493280e..1c180126a3 100644
--- a/datafusion/functions-aggregate/benches/sum.rs
+++ b/datafusion/functions-aggregate/benches/sum.rs
@@ -23,7 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main, 
Criterion};
 use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl, 
GroupsAccumulator};
 use datafusion_functions_aggregate::sum::Sum;
 use datafusion_physical_expr::expressions::col;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use std::sync::Arc;
 
 fn prepare_accumulator(data_type: &DataType) -> Box<dyn GroupsAccumulator> {
@@ -32,7 +32,7 @@ fn prepare_accumulator(data_type: &DataType) -> Box<dyn 
GroupsAccumulator> {
         return_type: data_type,
         schema: &schema,
         ignore_nulls: false,
-        ordering_req: LexOrderingRef::default(),
+        ordering_req: &LexOrdering::default(),
         is_reversed: false,
         name: "SUM(f)",
         is_distinct: false,
diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index 7c22c21e38..252a07cb11 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg {
         OrderSensitiveArrayAggAccumulator::try_new(
             &data_type,
             &ordering_dtypes,
-            LexOrdering::from_ref(acc_args.ordering_req),
+            acc_args.ordering_req.clone(),
             acc_args.is_reversed,
         )
         .map(|acc| Box::new(acc) as _)
diff --git a/datafusion/functions-aggregate/src/first_last.rs 
b/datafusion/functions-aggregate/src/first_last.rs
index 0b05713499..3ca1422668 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -37,7 +37,7 @@ use datafusion_expr::{
     ExprFunctionExt, Signature, SortExpr, TypeSignature, Volatility,
 };
 use datafusion_functions_aggregate_common::utils::get_sort_options;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 create_func!(FirstValue, first_value_udaf);
 
@@ -130,7 +130,7 @@ impl AggregateUDFImpl for FirstValue {
         FirstValueAccumulator::try_new(
             acc_args.return_type,
             &ordering_dtypes,
-            LexOrdering::from_ref(acc_args.ordering_req),
+            acc_args.ordering_req.clone(),
             acc_args.ignore_nulls,
         )
         .map(|acc| 
Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
@@ -455,7 +455,7 @@ impl AggregateUDFImpl for LastValue {
         LastValueAccumulator::try_new(
             acc_args.return_type,
             &ordering_dtypes,
-            LexOrdering::from_ref(acc_args.ordering_req),
+            acc_args.ordering_req.clone(),
             acc_args.ignore_nulls,
         )
         .map(|acc| 
Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
@@ -723,10 +723,7 @@ fn filter_states_according_to_is_set(
 }
 
 /// Combines array refs and their corresponding orderings to construct 
`SortColumn`s.
-fn convert_to_sort_cols(
-    arrs: &[ArrayRef],
-    sort_exprs: LexOrderingRef,
-) -> Vec<SortColumn> {
+fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) -> 
Vec<SortColumn> {
     arrs.iter()
         .zip(sort_exprs.iter())
         .map(|(item, sort_expr)| SortColumn {
diff --git a/datafusion/functions-aggregate/src/nth_value.rs 
b/datafusion/functions-aggregate/src/nth_value.rs
index 5f3a8cf2f1..f3e892fa73 100644
--- a/datafusion/functions-aggregate/src/nth_value.rs
+++ b/datafusion/functions-aggregate/src/nth_value.rs
@@ -133,7 +133,7 @@ impl AggregateUDFImpl for NthValueAgg {
             n,
             &data_type,
             &ordering_dtypes,
-            LexOrdering::from_ref(acc_args.ordering_req),
+            acc_args.ordering_req.clone(),
         )
         .map(|acc| Box::new(acc) as _)
     }
diff --git a/datafusion/functions-aggregate/src/stddev.rs 
b/datafusion/functions-aggregate/src/stddev.rs
index 95269ed821..b785d8e985 100644
--- a/datafusion/functions-aggregate/src/stddev.rs
+++ b/datafusion/functions-aggregate/src/stddev.rs
@@ -410,7 +410,7 @@ mod tests {
     use datafusion_expr::AggregateUDF;
     use 
datafusion_functions_aggregate_common::utils::get_accum_scalar_values_as_arrays;
     use datafusion_physical_expr::expressions::col;
-    use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+    use datafusion_physical_expr_common::sort_expr::LexOrdering;
     use std::sync::Arc;
 
     #[test]
@@ -462,7 +462,7 @@ mod tests {
             return_type: &DataType::Float64,
             schema,
             ignore_nulls: false,
-            ordering_req: LexOrderingRef::default(),
+            ordering_req: &LexOrdering::default(),
             name: "a",
             is_distinct: false,
             is_reversed: false,
@@ -473,7 +473,7 @@ mod tests {
             return_type: &DataType::Float64,
             schema,
             ignore_nulls: false,
-            ordering_req: LexOrderingRef::default(),
+            ordering_req: &LexOrdering::default(),
             name: "a",
             is_distinct: false,
             is_reversed: false,
diff --git a/datafusion/physical-expr-common/Cargo.toml 
b/datafusion/physical-expr-common/Cargo.toml
index 45ccb08e52..ad27c9d49c 100644
--- a/datafusion/physical-expr-common/Cargo.toml
+++ b/datafusion/physical-expr-common/Cargo.toml
@@ -41,4 +41,5 @@ arrow = { workspace = true }
 datafusion-common = { workspace = true, default-features = true }
 datafusion-expr-common = { workspace = true }
 hashbrown = { workspace = true }
+itertools = { workspace = true }
 rand = { workspace = true }
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs 
b/datafusion/physical-expr-common/src/sort_expr.rs
index addf2fbfca..f91d583215 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -22,7 +22,7 @@ use std::fmt;
 use std::fmt::{Display, Formatter};
 use std::hash::{Hash, Hasher};
 use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
 use std::vec::IntoIter;
 
 use arrow::compute::kernels::sort::{SortColumn, SortOptions};
@@ -30,6 +30,7 @@ use arrow::datatypes::Schema;
 use arrow::record_batch::RecordBatch;
 use datafusion_common::Result;
 use datafusion_expr_common::columnar_value::ColumnarValue;
+use itertools::Itertools;
 
 /// Represents Sort operation for a column in a RecordBatch
 ///
@@ -218,7 +219,7 @@ impl From<PhysicalSortRequirement> for PhysicalSortExpr {
     /// If options is `None`, the default sort options `ASC, NULLS LAST` is 
used.
     ///
     /// The default is picked to be consistent with
-    /// PostgreSQL: 
<https://www.postgresql.org/docs/current/queries-order.html>    
+    /// PostgreSQL: 
<https://www.postgresql.org/docs/current/queries-order.html>
     fn from(value: PhysicalSortRequirement) -> Self {
         let options = value.options.unwrap_or(SortOptions {
             descending: false,
@@ -309,13 +310,8 @@ impl PhysicalSortRequirement {
     pub fn from_sort_exprs<'a>(
         ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
     ) -> LexRequirement {
-        LexRequirement::new(
-            ordering
-                .into_iter()
-                .cloned()
-                .map(PhysicalSortRequirement::from)
-                .collect(),
-        )
+        let ordering = ordering.into_iter().cloned().collect();
+        LexRequirement::from_lex_ordering(ordering)
     }
 
     /// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
@@ -327,10 +323,8 @@ impl PhysicalSortRequirement {
     pub fn to_sort_exprs(
         requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
     ) -> LexOrdering {
-        requirements
-            .into_iter()
-            .map(PhysicalSortExpr::from)
-            .collect()
+        let requirements = requirements.into_iter().collect();
+        LexOrdering::from_lex_requirement(requirements)
     }
 }
 
@@ -352,14 +346,23 @@ pub struct LexOrdering {
     pub inner: Vec<PhysicalSortExpr>,
 }
 
+impl AsRef<LexOrdering> for LexOrdering {
+    fn as_ref(&self) -> &LexOrdering {
+        self
+    }
+}
+
+static EMPTY_ORDER: OnceLock<LexOrdering> = OnceLock::new();
+
 impl LexOrdering {
     // Creates a new [`LexOrdering`] from a vector
     pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
         Self { inner }
     }
 
-    pub fn as_ref(&self) -> LexOrderingRef {
-        &self.inner
+    /// Return an empty LexOrdering (no expressions)
+    pub fn empty() -> &'static LexOrdering {
+        EMPTY_ORDER.get_or_init(LexOrdering::default)
     }
 
     pub fn capacity(&self) -> usize {
@@ -378,10 +381,6 @@ impl LexOrdering {
         self.inner.extend(iter)
     }
 
-    pub fn from_ref(lex_ordering_ref: LexOrderingRef) -> Self {
-        Self::new(lex_ordering_ref.to_vec())
-    }
-
     pub fn is_empty(&self) -> bool {
         self.inner.is_empty()
     }
@@ -409,6 +408,36 @@ impl LexOrdering {
     pub fn truncate(&mut self, len: usize) {
         self.inner.truncate(len)
     }
+
+    /// Merge the contents of `other` into `self`, removing duplicates.
+    pub fn merge(mut self, other: LexOrdering) -> Self {
+        self.inner = self.inner.into_iter().chain(other).unique().collect();
+        self
+    }
+
+    /// Converts a `LexRequirement` into a `LexOrdering`.
+    ///
+    /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
+    /// for each entry in the input. If required ordering is None for an entry
+    /// default ordering `ASC, NULLS LAST` if given (see the 
`PhysicalSortExpr::from`).
+    pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering {
+        requirements
+            .into_iter()
+            .map(PhysicalSortExpr::from)
+            .collect()
+    }
+}
+
+impl From<Vec<PhysicalSortExpr>> for LexOrdering {
+    fn from(value: Vec<PhysicalSortExpr>) -> Self {
+        Self::new(value)
+    }
+}
+
+impl From<LexRequirement> for LexOrdering {
+    fn from(value: LexRequirement) -> Self {
+        Self::from_lex_requirement(value)
+    }
 }
 
 impl Deref for LexOrdering {
@@ -489,6 +518,7 @@ impl IntoIterator for LexOrdering {
 
 ///`LexOrderingRef` is an alias for the type &`[PhysicalSortExpr]`, which 
represents
 /// a reference to a lexicographical ordering.
+#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
 pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
 
 ///`LexRequirement` is an struct containing a `Vec<PhysicalSortRequirement>`, 
which
@@ -514,6 +544,30 @@ impl LexRequirement {
     pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement) 
{
         self.inner.push(physical_sort_requirement)
     }
+
+    /// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s.
+    ///
+    /// Returns [`PhysicalSortRequirement`] that requires the exact
+    /// sort of the [`PhysicalSortExpr`]s in `ordering`
+    ///
+    /// This method takes `&'a PhysicalSortExpr` to make it easy to
+    /// use implementing [`ExecutionPlan::required_input_ordering`].
+    ///
+    /// [`ExecutionPlan::required_input_ordering`]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
+    pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
+        Self::new(
+            ordering
+                .into_iter()
+                .map(PhysicalSortRequirement::from)
+                .collect(),
+        )
+    }
+}
+
+impl From<LexOrdering> for LexRequirement {
+    fn from(value: LexOrdering) -> Self {
+        Self::from_lex_ordering(value)
+    }
 }
 
 impl Deref for LexRequirement {
@@ -545,6 +599,16 @@ impl IntoIterator for LexRequirement {
     }
 }
 
+impl<'a> IntoIterator for &'a LexOrdering {
+    type Item = &'a PhysicalSortExpr;
+    type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
+
+    fn into_iter(self) -> Self::IntoIter {
+        self.inner.iter()
+    }
+}
+
 ///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`, 
which
 /// represents a reference to a lexicographical ordering requirement.
+/// #[deprecated(since = "43.0.0", note = "use &LexRequirement instead")]
 pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];
diff --git a/datafusion/physical-expr-common/src/utils.rs 
b/datafusion/physical-expr-common/src/utils.rs
index 26293b1a76..ffdab6c6d3 100644
--- a/datafusion/physical-expr-common/src/utils.rs
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -24,7 +24,7 @@ use datafusion_common::Result;
 use datafusion_expr_common::sort_properties::ExprProperties;
 
 use crate::physical_expr::PhysicalExpr;
-use crate::sort_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
+use crate::sort_expr::{LexOrdering, PhysicalSortExpr};
 use crate::tree_node::ExprContext;
 
 /// Represents a [`PhysicalExpr`] node with associated properties (order and
@@ -96,7 +96,7 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) -> 
Result<ArrayRef> {
 /// Reverses the ORDER BY expression, which is useful during equivalent window
 /// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns 
into
 /// 'ORDER BY a DESC, NULLS FIRST'.
-pub fn reverse_order_bys(order_bys: LexOrderingRef) -> LexOrdering {
+pub fn reverse_order_bys(order_bys: &LexOrdering) -> LexOrdering {
     order_bys
         .iter()
         .map(|e| PhysicalSortExpr::new(e.expr.clone(), !e.options))
diff --git a/datafusion/physical-expr/src/aggregate.rs 
b/datafusion/physical-expr/src/aggregate.rs
index e446776aff..5dc1389334 100644
--- a/datafusion/physical-expr/src/aggregate.rs
+++ b/datafusion/physical-expr/src/aggregate.rs
@@ -45,7 +45,7 @@ use 
datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
 use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs;
 use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_expr_common::utils::reverse_order_bys;
 
 use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
@@ -292,7 +292,7 @@ impl AggregateFunctionExpr {
     /// Order by requirements for the aggregate function
     /// By default it is `None` (there is no requirement)
     /// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)` 
should implement this
-    pub fn order_bys(&self) -> Option<LexOrderingRef> {
+    pub fn order_bys(&self) -> Option<&LexOrdering> {
         if self.ordering_req.is_empty() {
             return None;
         }
@@ -490,7 +490,10 @@ impl AggregateFunctionExpr {
     /// These expressions are  (1)function arguments, (2) order by expressions.
     pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
         let args = self.expressions();
-        let order_bys = self.order_bys().unwrap_or_default();
+        let order_bys = self
+            .order_bys()
+            .cloned()
+            .unwrap_or_else(LexOrdering::default);
         let order_by_exprs = order_bys
             .iter()
             .map(|sort_expr| Arc::clone(&sort_expr.expr))
diff --git a/datafusion/physical-expr/src/equivalence/class.rs 
b/datafusion/physical-expr/src/equivalence/class.rs
index 7305bc1b0a..35ff6f685b 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -21,9 +21,8 @@ use std::sync::Arc;
 use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
 use crate::{
     expressions::Column, physical_expr::deduplicate_physical_exprs,
-    physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, 
LexOrderingRef,
-    LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef, 
PhysicalSortExpr,
-    PhysicalSortRequirement,
+    physical_exprs_bag_equal, physical_exprs_contains, LexOrdering, 
LexRequirement,
+    PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement,
 };
 
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -475,7 +474,7 @@ impl EquivalenceGroup {
     /// This function applies the `normalize_sort_expr` function for all sort
     /// expressions in `sort_exprs` and returns the corresponding normalized
     /// sort expressions.
-    pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> 
LexOrdering {
+    pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> 
LexOrdering {
         // Convert sort expressions to sort requirements:
         let sort_reqs = 
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
         // Normalize the requirements:
@@ -489,7 +488,7 @@ impl EquivalenceGroup {
     /// sort requirements.
     pub fn normalize_sort_requirements(
         &self,
-        sort_reqs: LexRequirementRef,
+        sort_reqs: &LexRequirement,
     ) -> LexRequirement {
         collapse_lex_req(LexRequirement::new(
             sort_reqs
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs 
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 838c9800f9..06f85b657e 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -146,12 +146,7 @@ impl OrderingEquivalenceClass {
     /// Returns the concatenation of all the orderings. This enables merge
     /// operations to preserve all equivalent orderings simultaneously.
     pub fn output_ordering(&self) -> Option<LexOrdering> {
-        let output_ordering = self
-            .orderings
-            .iter()
-            .flat_map(|ordering| ordering.as_ref())
-            .cloned()
-            .collect();
+        let output_ordering = 
self.orderings.iter().flatten().cloned().collect();
         let output_ordering = collapse_lex_ordering(output_ordering);
         (!output_ordering.is_empty()).then_some(output_ordering)
     }
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 55c99e93d0..061e77222f 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -30,9 +30,8 @@ use crate::equivalence::{
 };
 use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
 use crate::{
-    physical_exprs_contains, ConstExpr, LexOrdering, LexOrderingRef, 
LexRequirement,
-    LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
-    PhysicalSortRequirement,
+    physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement, 
PhysicalExpr,
+    PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement,
 };
 
 use arrow_schema::{SchemaRef, SortOptions};
@@ -197,7 +196,7 @@ impl EquivalenceProperties {
         OrderingEquivalenceClass::new(
             self.oeq_class
                 .iter()
-                .map(|ordering| self.normalize_sort_exprs(ordering.as_ref()))
+                .map(|ordering| self.normalize_sort_exprs(ordering))
                 .collect(),
         )
     }
@@ -408,7 +407,7 @@ impl EquivalenceProperties {
     /// function would return `vec![a ASC, c ASC]`. Internally, it would first
     /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final 
result
     /// after deduplication.
-    fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
+    fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
         // Convert sort expressions to sort requirements:
         let sort_reqs = 
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
         // Normalize the requirements:
@@ -430,10 +429,7 @@ impl EquivalenceProperties {
     /// function would return `vec![a ASC, c ASC]`. Internally, it would first
     /// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final 
result
     /// after deduplication.
-    fn normalize_sort_requirements(
-        &self,
-        sort_reqs: LexRequirementRef,
-    ) -> LexRequirement {
+    fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) -> 
LexRequirement {
         let normalized_sort_reqs = 
self.eq_group.normalize_sort_requirements(sort_reqs);
         let mut constant_exprs = vec![];
         constant_exprs.extend(
@@ -456,7 +452,7 @@ impl EquivalenceProperties {
 
     /// Checks whether the given ordering is satisfied by any of the existing
     /// orderings.
-    pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool {
+    pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
         // Convert the given sort expressions to sort requirements:
         let sort_requirements = 
PhysicalSortRequirement::from_sort_exprs(given.iter());
         self.ordering_satisfy_requirement(&sort_requirements)
@@ -464,7 +460,7 @@ impl EquivalenceProperties {
 
     /// Checks whether the given sort requirements are satisfied by any of the
     /// existing orderings.
-    pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) -> 
bool {
+    pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
         let mut eq_properties = self.clone();
         // First, standardize the given requirement:
         let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
@@ -525,8 +521,8 @@ impl EquivalenceProperties {
     /// than the `reference` sort requirements.
     pub fn requirements_compatible(
         &self,
-        given: LexRequirementRef,
-        reference: LexRequirementRef,
+        given: &LexRequirement,
+        reference: &LexRequirement,
     ) -> bool {
         let normalized_given = self.normalize_sort_requirements(given);
         let normalized_reference = self.normalize_sort_requirements(reference);
@@ -548,8 +544,8 @@ impl EquivalenceProperties {
     /// the latter.
     pub fn get_finer_ordering(
         &self,
-        lhs: LexOrderingRef,
-        rhs: LexOrderingRef,
+        lhs: &LexOrdering,
+        rhs: &LexOrdering,
     ) -> Option<LexOrdering> {
         // Convert the given sort expressions to sort requirements:
         let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
@@ -569,8 +565,8 @@ impl EquivalenceProperties {
     /// is the latter.
     pub fn get_finer_requirement(
         &self,
-        req1: LexRequirementRef,
-        req2: LexRequirementRef,
+        req1: &LexRequirement,
+        req2: &LexRequirement,
     ) -> Option<LexRequirement> {
         let mut lhs = self.normalize_sort_requirements(req1);
         let mut rhs = self.normalize_sort_requirements(req2);
@@ -606,7 +602,7 @@ impl EquivalenceProperties {
     pub fn substitute_ordering_component(
         &self,
         mapping: &ProjectionMapping,
-        sort_expr: LexOrderingRef,
+        sort_expr: &LexOrdering,
     ) -> Result<Vec<LexOrdering>> {
         let new_orderings = sort_expr
             .iter()
@@ -656,7 +652,7 @@ impl EquivalenceProperties {
         let orderings = &self.oeq_class.orderings;
         let new_order = orderings
             .iter()
-            .map(|order| self.substitute_ordering_component(mapping, 
order.as_ref()))
+            .map(|order| self.substitute_ordering_component(mapping, order))
             .collect::<Result<Vec<_>>>()?;
         let new_order = new_order.into_iter().flatten().collect();
         self.oeq_class = OrderingEquivalenceClass::new(new_order);
diff --git a/datafusion/physical-expr/src/lib.rs 
b/datafusion/physical-expr/src/lib.rs
index e7c2b4119c..405b6bbd69 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -54,8 +54,7 @@ pub use physical_expr::{
 
 pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 pub use datafusion_physical_expr_common::sort_expr::{
-    LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef, 
PhysicalSortExpr,
-    PhysicalSortRequirement,
+    LexOrdering, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement,
 };
 
 pub use planner::{create_physical_expr, create_physical_exprs};
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index 73d744b4b6..30cfecf0e2 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -33,7 +33,7 @@ use datafusion_common::tree_node::{
 use datafusion_common::{HashMap, HashSet, Result};
 use datafusion_expr::Operator;
 
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use itertools::Itertools;
 use petgraph::graph::NodeIndex;
 use petgraph::stable_graph::StableGraph;
@@ -244,7 +244,7 @@ pub fn reassign_predicate_columns(
 }
 
 /// Merge left and right sort expressions, checking for duplicates.
-pub fn merge_vectors(left: LexOrderingRef, right: LexOrderingRef) -> 
LexOrdering {
+pub fn merge_vectors(left: &LexOrdering, right: &LexOrdering) -> LexOrdering {
     left.iter()
         .cloned()
         .chain(right.iter().cloned())
diff --git a/datafusion/physical-expr/src/window/aggregate.rs 
b/datafusion/physical-expr/src/window/aggregate.rs
index 94960c95e4..0c56bdc929 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -34,7 +34,7 @@ use crate::{reverse_order_bys, PhysicalExpr};
 use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::{Accumulator, WindowFrame};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 /// A window expr that takes the form of an aggregate function.
 ///
@@ -52,13 +52,13 @@ impl PlainAggregateWindowExpr {
     pub fn new(
         aggregate: Arc<AggregateFunctionExpr>,
         partition_by: &[Arc<dyn PhysicalExpr>],
-        order_by: LexOrderingRef,
+        order_by: &LexOrdering,
         window_frame: Arc<WindowFrame>,
     ) -> Self {
         Self {
             aggregate,
             partition_by: partition_by.to_vec(),
-            order_by: LexOrdering::from_ref(order_by),
+            order_by: order_by.clone(),
             window_frame,
         }
     }
@@ -124,7 +124,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
         &self.partition_by
     }
 
-    fn order_by(&self) -> LexOrderingRef {
+    fn order_by(&self) -> &LexOrdering {
         self.order_by.as_ref()
     }
 
diff --git a/datafusion/physical-expr/src/window/built_in.rs 
b/datafusion/physical-expr/src/window/built_in.rs
index 5f6c5e5c2c..0f6c3f9218 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -33,7 +33,7 @@ use datafusion_common::utils::evaluate_partition_ranges;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
 use datafusion_expr::WindowFrame;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 /// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
 #[derive(Debug)]
@@ -49,13 +49,13 @@ impl BuiltInWindowExpr {
     pub fn new(
         expr: Arc<dyn BuiltInWindowFunctionExpr>,
         partition_by: &[Arc<dyn PhysicalExpr>],
-        order_by: LexOrderingRef,
+        order_by: &LexOrdering,
         window_frame: Arc<WindowFrame>,
     ) -> Self {
         Self {
             expr,
             partition_by: partition_by.to_vec(),
-            order_by: LexOrdering::from_ref(order_by),
+            order_by: order_by.clone(),
             window_frame,
         }
     }
@@ -118,7 +118,7 @@ impl WindowExpr for BuiltInWindowExpr {
         &self.partition_by
     }
 
-    fn order_by(&self) -> LexOrderingRef {
+    fn order_by(&self) -> &LexOrdering {
         self.order_by.as_ref()
     }
 
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs 
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 1e46baae7b..572eb8866a 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -33,7 +33,7 @@ use crate::window::{
 use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::{Accumulator, WindowFrame};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 /// A window expr that takes the form of an aggregate function that
 /// can be incrementally computed over sliding windows.
@@ -52,13 +52,13 @@ impl SlidingAggregateWindowExpr {
     pub fn new(
         aggregate: Arc<AggregateFunctionExpr>,
         partition_by: &[Arc<dyn PhysicalExpr>],
-        order_by: LexOrderingRef,
+        order_by: &LexOrdering,
         window_frame: Arc<WindowFrame>,
     ) -> Self {
         Self {
             aggregate,
             partition_by: partition_by.to_vec(),
-            order_by: LexOrdering::from_ref(order_by),
+            order_by: order_by.clone(),
             window_frame,
         }
     }
@@ -108,7 +108,7 @@ impl WindowExpr for SlidingAggregateWindowExpr {
         &self.partition_by
     }
 
-    fn order_by(&self) -> LexOrderingRef {
+    fn order_by(&self) -> &LexOrdering {
         self.order_by.as_ref()
     }
 
diff --git a/datafusion/physical-expr/src/window/window_expr.rs 
b/datafusion/physical-expr/src/window/window_expr.rs
index 0f882def44..828e5ad206 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -20,7 +20,7 @@ use std::fmt::Debug;
 use std::ops::Range;
 use std::sync::Arc;
 
-use crate::{LexOrderingRef, PhysicalExpr};
+use crate::{LexOrdering, PhysicalExpr};
 
 use arrow::array::{new_empty_array, Array, ArrayRef};
 use arrow::compute::kernels::sort::SortColumn;
@@ -109,7 +109,7 @@ pub trait WindowExpr: Send + Sync + Debug {
     fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
 
     /// Expressions that's from the window function's order by clause, empty 
if absent
-    fn order_by(&self) -> LexOrderingRef;
+    fn order_by(&self) -> &LexOrdering;
 
     /// Get order by columns, empty if absent
     fn order_by_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>> 
{
@@ -344,7 +344,7 @@ pub(crate) fn is_end_bound_safe(
     window_frame_ctx: &WindowFrameContext,
     order_bys: &[ArrayRef],
     most_recent_order_bys: Option<&[ArrayRef]>,
-    sort_exprs: LexOrderingRef,
+    sort_exprs: &LexOrdering,
     idx: usize,
 ) -> Result<bool> {
     if sort_exprs.is_empty() {
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index 5ffe797c5c..a71039b573 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -940,7 +940,7 @@ fn get_aggregate_expr_req(
         return LexOrdering::default();
     }
 
-    let mut req = 
LexOrdering::from_ref(aggr_expr.order_bys().unwrap_or_default());
+    let mut req = aggr_expr.order_bys().cloned().unwrap_or_default();
 
     // In non-first stage modes, we accumulate data (using `merge_batch`) from
     // different partitions (i.e. merge partial results). During this merge, we
@@ -983,7 +983,7 @@ fn finer_ordering(
     agg_mode: &AggregateMode,
 ) -> Option<LexOrdering> {
     let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode);
-    eq_properties.get_finer_ordering(existing_req.as_ref(), aggr_req.as_ref())
+    eq_properties.get_finer_ordering(existing_req, aggr_req.as_ref())
 }
 
 /// Concatenates the given slices.
diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs 
b/datafusion/physical-plan/src/aggregates/order/mod.rs
index 24846d2395..7d9a50e20a 100644
--- a/datafusion/physical-plan/src/aggregates/order/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/order/mod.rs
@@ -19,7 +19,7 @@ use arrow_array::ArrayRef;
 use arrow_schema::Schema;
 use datafusion_common::Result;
 use datafusion_expr::EmitTo;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use std::mem::size_of;
 
 mod full;
@@ -45,7 +45,7 @@ impl GroupOrdering {
     pub fn try_new(
         input_schema: &Schema,
         mode: &InputOrderMode,
-        ordering: LexOrderingRef,
+        ordering: &LexOrdering,
     ) -> Result<Self> {
         match mode {
             InputOrderMode::Linear => Ok(GroupOrdering::None),
diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs 
b/datafusion/physical-plan/src/aggregates/order/partial.rs
index 5cc55dc0d0..5a05b88798 100644
--- a/datafusion/physical-plan/src/aggregates/order/partial.rs
+++ b/datafusion/physical-plan/src/aggregates/order/partial.rs
@@ -21,7 +21,7 @@ use arrow_schema::Schema;
 use datafusion_common::Result;
 use datafusion_execution::memory_pool::proxy::VecAllocExt;
 use datafusion_expr::EmitTo;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use std::mem::size_of;
 use std::sync::Arc;
 
@@ -107,7 +107,7 @@ impl GroupOrderingPartial {
     pub fn try_new(
         input_schema: &Schema,
         order_indices: &[usize],
-        ordering: LexOrderingRef,
+        ordering: &LexOrdering,
     ) -> Result<Self> {
         assert!(!order_indices.is_empty());
         assert!(order_indices.len() <= ordering.len());
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index d65320dbab..7220e7594e 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -38,7 +38,7 @@ pub use datafusion_physical_expr::{
     expressions, udf, Distribution, Partitioning, PhysicalExpr,
 };
 use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
-use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, 
LexRequirement};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
 
 use crate::coalesce_partitions::CoalescePartitionsExec;
 use crate::display::DisplayableExecutionPlan;
@@ -443,7 +443,7 @@ pub trait ExecutionPlanProperties {
     /// For example, `SortExec` (obviously) produces sorted output as does
     /// `SortPreservingMergeStream`. Less obviously, `Projection` produces 
sorted
     /// output if its input is sorted as it does not reorder the input rows.
-    fn output_ordering(&self) -> Option<LexOrderingRef>;
+    fn output_ordering(&self) -> Option<&LexOrdering>;
 
     /// Get the [`EquivalenceProperties`] within the plan.
     ///
@@ -474,7 +474,7 @@ impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
         self.properties().execution_mode()
     }
 
-    fn output_ordering(&self) -> Option<LexOrderingRef> {
+    fn output_ordering(&self) -> Option<&LexOrdering> {
         self.properties().output_ordering()
     }
 
@@ -492,7 +492,7 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan {
         self.properties().execution_mode()
     }
 
-    fn output_ordering(&self) -> Option<LexOrderingRef> {
+    fn output_ordering(&self) -> Option<&LexOrdering> {
         self.properties().output_ordering()
     }
 
@@ -643,8 +643,8 @@ impl PlanProperties {
         &self.partitioning
     }
 
-    pub fn output_ordering(&self) -> Option<LexOrderingRef> {
-        self.output_ordering.as_deref()
+    pub fn output_ordering(&self) -> Option<&LexOrdering> {
+        self.output_ordering.as_ref()
     }
 
     pub fn execution_mode(&self) -> ExecutionMode {
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs 
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index f08ce0ea2f..cea04ccad3 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -40,7 +40,7 @@ use 
datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
 use datafusion_physical_expr::utils::collect_columns;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use hashbrown::raw::RawTable;
 
 /// Implementation of `JoinHashMapType` for `PruningJoinHashMap`.
@@ -744,8 +744,8 @@ pub fn prepare_sorted_exprs(
     filter: &JoinFilter,
     left: &Arc<dyn ExecutionPlan>,
     right: &Arc<dyn ExecutionPlan>,
-    left_sort_exprs: LexOrderingRef,
-    right_sort_exprs: LexOrderingRef,
+    left_sort_exprs: &LexOrdering,
+    right_sort_exprs: &LexOrdering,
 ) -> Result<(SortedFilterExpr, SortedFilterExpr, ExprIntervalGraph)> {
     let err = || {
         datafusion_common::plan_datafusion_err!("Filter does not include the 
child order")
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index f082bdbdd3..84c9f03b07 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -73,9 +73,7 @@ use 
datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
 use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
 
 use ahash::RandomState;
-use datafusion_physical_expr_common::sort_expr::{
-    LexOrdering, LexOrderingRef, LexRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use futures::{ready, Stream, StreamExt};
 use parking_lot::Mutex;
 
@@ -319,13 +317,13 @@ impl SymmetricHashJoinExec {
     }
 
     /// Get left_sort_exprs
-    pub fn left_sort_exprs(&self) -> Option<LexOrderingRef> {
-        self.left_sort_exprs.as_deref()
+    pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
+        self.left_sort_exprs.as_ref()
     }
 
     /// Get right_sort_exprs
-    pub fn right_sort_exprs(&self) -> Option<LexOrderingRef> {
-        self.right_sort_exprs.as_deref()
+    pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
+        self.right_sort_exprs.as_ref()
     }
 
     /// Check if order information covers every column in the filter 
expression.
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index a257119a8b..0366c9fa5e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -51,7 +51,7 @@ use datafusion_physical_expr::equivalence::add_offset_to_expr;
 use datafusion_physical_expr::expressions::Column;
 use datafusion_physical_expr::utils::{collect_columns, merge_vectors};
 use datafusion_physical_expr::{
-    LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef, 
PhysicalSortExpr,
+    LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
 };
 
 use futures::future::{BoxFuture, Shared};
@@ -469,7 +469,7 @@ fn replace_on_columns_of_right_ordering(
 }
 
 fn offset_ordering(
-    ordering: LexOrderingRef,
+    ordering: &LexOrdering,
     join_type: &JoinType,
     offset: usize,
 ) -> LexOrdering {
@@ -483,14 +483,14 @@ fn offset_ordering(
                 options: sort_expr.options,
             })
             .collect(),
-        _ => LexOrdering::from_ref(ordering),
+        _ => ordering.clone(),
     }
 }
 
 /// Calculate the output ordering of a given join operation.
 pub fn calculate_join_output_ordering(
-    left_ordering: LexOrderingRef,
-    right_ordering: LexOrderingRef,
+    left_ordering: &LexOrdering,
+    right_ordering: &LexOrdering,
     join_type: JoinType,
     on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
     left_columns_len: usize,
@@ -503,7 +503,7 @@ pub fn calculate_join_output_ordering(
             if join_type == JoinType::Inner && probe_side == 
Some(JoinSide::Left) {
                 replace_on_columns_of_right_ordering(
                     on_columns,
-                    &mut LexOrdering::from_ref(right_ordering),
+                    &mut right_ordering.clone(),
                 )
                 .ok()?;
                 merge_vectors(
@@ -512,7 +512,7 @@ pub fn calculate_join_output_ordering(
                         .as_ref(),
                 )
             } else {
-                LexOrdering::from_ref(left_ordering)
+                left_ordering.clone()
             }
         }
         [false, true] => {
@@ -520,7 +520,7 @@ pub fn calculate_join_output_ordering(
             if join_type == JoinType::Inner && probe_side == 
Some(JoinSide::Right) {
                 replace_on_columns_of_right_ordering(
                     on_columns,
-                    &mut LexOrdering::from_ref(right_ordering),
+                    &mut right_ordering.clone(),
                 )
                 .ok()?;
                 merge_vectors(
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 4d0dbc75d4..1730c7d8dc 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -29,6 +29,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet, 
MetricBuilder, MetricsSet};
 use super::{
     DisplayAs, ExecutionPlanProperties, RecordBatchStream, 
SendableRecordBatchStream,
 };
+use crate::execution_plan::CardinalityEffect;
 use crate::hash_utils::create_hashes;
 use crate::metrics::BaselineMetrics;
 use crate::repartition::distributor_channels::{
@@ -48,10 +49,9 @@ use datafusion_common_runtime::SpawnedTask;
 use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
-use crate::execution_plan::CardinalityEffect;
 use datafusion_common::HashMap;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
 use futures::stream::Stream;
 use futures::{FutureExt, StreamExt, TryStreamExt};
 use log::trace;
@@ -503,7 +503,7 @@ impl DisplayAs for RepartitionExec {
                 }
 
                 if let Some(sort_exprs) = self.sort_exprs() {
-                    write!(f, ", sort_exprs={}", 
LexOrdering::from_ref(sort_exprs))?;
+                    write!(f, ", sort_exprs={}", sort_exprs.clone())?;
                 }
                 Ok(())
             }
@@ -572,7 +572,7 @@ impl ExecutionPlan for RepartitionExec {
         let schema_captured = Arc::clone(&schema);
 
         // Get existing ordering to use for merging
-        let sort_exprs = self.sort_exprs().unwrap_or(&[]).to_owned();
+        let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
 
         let stream = futures::stream::once(async move {
             let num_input_partitions = 
input.output_partitioning().partition_count();
@@ -756,7 +756,7 @@ impl RepartitionExec {
     }
 
     /// Return the sort expressions that are used to merge
-    fn sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
+    fn sort_exprs(&self) -> Option<&LexOrdering> {
         if self.preserve_order {
             self.input.output_ordering()
         } else {
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs 
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 8f853464c9..e69989c1be 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -72,7 +72,6 @@ use datafusion_common::Result;
 use datafusion_execution::{RecordBatchStream, TaskContext};
 use datafusion_physical_expr::LexOrdering;
 
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
 use futures::{ready, Stream, StreamExt};
 use log::trace;
 
@@ -159,7 +158,7 @@ impl PartialSortExec {
     }
 
     /// Sort expressions
-    pub fn expr(&self) -> LexOrderingRef {
+    pub fn expr(&self) -> &LexOrdering {
         self.expr.as_ref()
     }
 
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index d90d0f64ce..ce7efce415 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -52,9 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer, 
MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::LexOrdering;
-use datafusion_physical_expr_common::sort_expr::{
-    LexOrderingRef, PhysicalSortRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
 
 use crate::execution_plan::CardinalityEffect;
 use futures::{StreamExt, TryStreamExt};
@@ -344,10 +342,12 @@ impl ExternalSorter {
                 streams.push(stream);
             }
 
+            let expressions: LexOrdering = self.expr.iter().cloned().collect();
+
             StreamingMergeBuilder::new()
                 .with_streams(streams)
                 .with_schema(Arc::clone(&self.schema))
-                .with_expressions(self.expr.to_vec().as_slice())
+                .with_expressions(expressions.as_ref())
                 .with_metrics(self.metrics.baseline.clone())
                 .with_batch_size(self.batch_size)
                 .with_fetch(self.fetch)
@@ -536,10 +536,12 @@ impl ExternalSorter {
             })
             .collect::<Result<_>>()?;
 
+        let expressions: LexOrdering = self.expr.iter().cloned().collect();
+
         StreamingMergeBuilder::new()
             .with_streams(streams)
             .with_schema(Arc::clone(&self.schema))
-            .with_expressions(self.expr.as_ref())
+            .with_expressions(expressions.as_ref())
             .with_metrics(metrics)
             .with_batch_size(self.batch_size)
             .with_fetch(self.fetch)
@@ -561,7 +563,7 @@ impl ExternalSorter {
         let schema = batch.schema();
 
         let fetch = self.fetch;
-        let expressions = Arc::clone(&self.expr);
+        let expressions: LexOrdering = self.expr.iter().cloned().collect();
         let stream = futures::stream::once(futures::future::lazy(move |_| {
             let timer = metrics.elapsed_compute().timer();
             let sorted = sort_batch(&batch, &expressions, fetch)?;
@@ -603,7 +605,7 @@ impl Debug for ExternalSorter {
 
 pub fn sort_batch(
     batch: &RecordBatch,
-    expressions: LexOrderingRef,
+    expressions: &LexOrdering,
     fetch: Option<usize>,
 ) -> Result<RecordBatch> {
     let sort_columns = expressions
@@ -762,8 +764,8 @@ impl SortExec {
     }
 
     /// Sort expressions
-    pub fn expr(&self) -> LexOrderingRef {
-        self.expr.as_ref()
+    pub fn expr(&self) -> &LexOrdering {
+        &self.expr
     }
 
     /// If `Some(fetch)`, limits output to only the first "fetch" items
@@ -790,11 +792,10 @@ impl SortExec {
         preserve_partitioning: bool,
     ) -> PlanProperties {
         // Determine execution mode:
-        let sort_satisfied = 
input.equivalence_properties().ordering_satisfy_requirement(
-            PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter())
-                .inner
-                .as_slice(),
-        );
+        let requirement = LexRequirement::from(sort_exprs);
+        let sort_satisfied = input
+            .equivalence_properties()
+            .ordering_satisfy_requirement(&requirement);
         let mode = match input.execution_mode() {
             ExecutionMode::Unbounded if sort_satisfied => 
ExecutionMode::Unbounded,
             ExecutionMode::Bounded => ExecutionMode::Bounded,
@@ -803,6 +804,7 @@ impl SortExec {
 
         // Calculate equivalence properties; i.e. reset the ordering 
equivalence
         // class with the new ordering:
+        let sort_exprs = LexOrdering::from(requirement);
         let eq_properties = input
             .equivalence_properties()
             .clone()
@@ -890,11 +892,7 @@ impl ExecutionPlan for SortExec {
         let sort_satisfied = self
             .input
             .equivalence_properties()
-            .ordering_satisfy_requirement(
-                PhysicalSortRequirement::from_sort_exprs(self.expr.iter())
-                    .inner
-                    .as_slice(),
-            );
+            
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
 
         match (sort_satisfied, self.fetch.as_ref()) {
             (true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs 
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 9ee0faaa0a..5c80322afe 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -34,9 +34,7 @@ use datafusion_execution::memory_pool::MemoryConsumer;
 use datafusion_execution::TaskContext;
 use datafusion_physical_expr::PhysicalSortRequirement;
 
-use datafusion_physical_expr_common::sort_expr::{
-    LexOrdering, LexOrderingRef, LexRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
 use log::{debug, trace};
 
 /// Sort preserving merge execution plan
@@ -122,8 +120,8 @@ impl SortPreservingMergeExec {
     }
 
     /// Sort expressions
-    pub fn expr(&self) -> LexOrderingRef {
-        &self.expr
+    pub fn expr(&self) -> &LexOrdering {
+        self.expr.as_ref()
     }
 
     /// Fetch
diff --git a/datafusion/physical-plan/src/sorts/stream.rs 
b/datafusion/physical-plan/src/sorts/stream.rs
index 70beb2c4a9..7c57fdf9ba 100644
--- a/datafusion/physical-plan/src/sorts/stream.rs
+++ b/datafusion/physical-plan/src/sorts/stream.rs
@@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch;
 use arrow::row::{RowConverter, SortField};
 use datafusion_common::Result;
 use datafusion_execution::memory_pool::MemoryReservation;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use futures::stream::{Fuse, StreamExt};
 use std::marker::PhantomData;
 use std::sync::Arc;
@@ -93,7 +93,7 @@ pub struct RowCursorStream {
 impl RowCursorStream {
     pub fn try_new(
         schema: &Schema,
-        expressions: LexOrderingRef,
+        expressions: &LexOrdering,
         streams: Vec<SendableRecordBatchStream>,
         reservation: MemoryReservation,
     ) -> Result<Self> {
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs 
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index bd74685eac..4350235ef4 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -28,7 +28,7 @@ use arrow::datatypes::{DataType, SchemaRef};
 use arrow_array::*;
 use datafusion_common::{internal_err, Result};
 use datafusion_execution::memory_pool::MemoryReservation;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 macro_rules! primitive_merge_helper {
     ($t:ty, $($v:ident),+) => {
@@ -51,11 +51,10 @@ macro_rules! merge_helper {
     }};
 }
 
-#[derive(Default)]
 pub struct StreamingMergeBuilder<'a> {
     streams: Vec<SendableRecordBatchStream>,
     schema: Option<SchemaRef>,
-    expressions: LexOrderingRef<'a>,
+    expressions: &'a LexOrdering,
     metrics: Option<BaselineMetrics>,
     batch_size: Option<usize>,
     fetch: Option<usize>,
@@ -63,6 +62,21 @@ pub struct StreamingMergeBuilder<'a> {
     enable_round_robin_tie_breaker: bool,
 }
 
+impl<'a> Default for StreamingMergeBuilder<'a> {
+    fn default() -> Self {
+        Self {
+            streams: vec![],
+            schema: None,
+            expressions: LexOrdering::empty(),
+            metrics: None,
+            batch_size: None,
+            fetch: None,
+            reservation: None,
+            enable_round_robin_tie_breaker: false,
+        }
+    }
+}
+
 impl<'a> StreamingMergeBuilder<'a> {
     pub fn new() -> Self {
         Self {
@@ -81,7 +95,7 @@ impl<'a> StreamingMergeBuilder<'a> {
         self
     }
 
-    pub fn with_expressions(mut self, expressions: LexOrderingRef<'a>) -> Self 
{
+    pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self {
         self.expressions = expressions;
         self
     }
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs 
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c3e0a4e389..602efa54f8 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1188,7 +1188,6 @@ mod tests {
     };
     use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
 
-    use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
     use futures::future::Shared;
     use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
     use itertools::Itertools;
@@ -1555,7 +1554,7 @@ mod tests {
             Arc::new(BuiltInWindowExpr::new(
                 last_value_func,
                 &[],
-                LexOrderingRef::default(),
+                &LexOrdering::default(),
                 Arc::new(WindowFrame::new_bounds(
                     WindowFrameUnits::Rows,
                     WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
@@ -1566,7 +1565,7 @@ mod tests {
             Arc::new(BuiltInWindowExpr::new(
                 nth_value_func1,
                 &[],
-                LexOrderingRef::default(),
+                &LexOrdering::default(),
                 Arc::new(WindowFrame::new_bounds(
                     WindowFrameUnits::Rows,
                     WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
@@ -1577,7 +1576,7 @@ mod tests {
             Arc::new(BuiltInWindowExpr::new(
                 nth_value_func2,
                 &[],
-                LexOrderingRef::default(),
+                &LexOrdering::default(),
                 Arc::new(WindowFrame::new_bounds(
                     WindowFrameUnits::Rows,
                     WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 217823fb6a..da7f6d79e5 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -53,7 +53,7 @@ use datafusion_physical_expr::expressions::Column;
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
 };
-use datafusion_physical_expr_common::sort_expr::{LexOrderingRef, 
LexRequirement};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
 pub use window_agg_exec::WindowAggExec;
 
 /// Build field from window function and add it into schema
@@ -98,7 +98,7 @@ pub fn create_window_expr(
     name: String,
     args: &[Arc<dyn PhysicalExpr>],
     partition_by: &[Arc<dyn PhysicalExpr>],
-    order_by: LexOrderingRef,
+    order_by: &LexOrdering,
     window_frame: Arc<WindowFrame>,
     input_schema: &Schema,
     ignore_nulls: bool,
@@ -139,7 +139,7 @@ pub fn create_window_expr(
 /// Creates an appropriate [`WindowExpr`] based on the window frame and
 fn window_expr_from_aggregate_expr(
     partition_by: &[Arc<dyn PhysicalExpr>],
-    order_by: LexOrderingRef,
+    order_by: &LexOrdering,
     window_frame: Arc<WindowFrame>,
     aggregate: Arc<AggregateFunctionExpr>,
 ) -> Arc<dyn WindowExpr> {
@@ -497,7 +497,7 @@ pub fn get_best_fitting_window(
 /// the mode this window operator should work in to accommodate the existing 
ordering.
 pub fn get_window_mode(
     partitionby_exprs: &[Arc<dyn PhysicalExpr>],
-    orderby_keys: LexOrderingRef,
+    orderby_keys: &LexOrdering,
     input: &Arc<dyn ExecutionPlan>,
 ) -> Option<(bool, InputOrderMode)> {
     let input_eqs = input.equivalence_properties().clone();
@@ -699,7 +699,7 @@ mod tests {
                 "count".to_owned(),
                 &[col("a", &schema)?],
                 &[],
-                LexOrderingRef::default(),
+                &LexOrdering::default(),
                 Arc::new(WindowFrame::new(None)),
                 schema.as_ref(),
                 false,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 4bf7e35332..dc94ad075c 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -53,7 +53,7 @@ pub fn serialize_physical_aggr_expr(
 ) -> Result<protobuf::PhysicalExprNode> {
     let expressions = serialize_physical_exprs(&aggr_expr.expressions(), 
codec)?;
     let ordering_req = match aggr_expr.order_bys() {
-        Some(order) => LexOrdering::from_ref(order),
+        Some(order) => order.clone(),
         None => LexOrdering::default(),
     };
     let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 1e078ee410..8c8dcccee3 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -52,8 +52,7 @@ use datafusion::logical_expr::{create_udf, JoinType, 
Operator, Volatility};
 use datafusion::physical_expr::expressions::Literal;
 use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
 use datafusion::physical_expr::{
-    LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement,
-    ScalarFunctionExpr,
+    LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr,
 };
 use datafusion::physical_plan::aggregates::{
     AggregateExec, AggregateMode, PhysicalGroupBy,
@@ -288,13 +287,15 @@ fn roundtrip_window() -> Result<()> {
             false,
         )),
         &[col("b", &schema)?],
-        &[PhysicalSortExpr {
-            expr: col("a", &schema)?,
-            options: SortOptions {
-                descending: false,
-                nulls_first: false,
-            },
-        }],
+        &LexOrdering{
+            inner: vec![PhysicalSortExpr {
+                expr: col("a", &schema)?,
+                options: SortOptions {
+                    descending: false,
+                    nulls_first: false,
+                },
+            }]
+        },
         Arc::new(window_frame),
     ));
 
@@ -308,7 +309,7 @@ fn roundtrip_window() -> Result<()> {
         .build()
         .map(Arc::new)?,
         &[],
-        LexOrderingRef::default(),
+        &LexOrdering::default(),
         Arc::new(WindowFrame::new(None)),
     ));
 
@@ -328,7 +329,7 @@ fn roundtrip_window() -> Result<()> {
     let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new(
         sum_expr,
         &[],
-        LexOrderingRef::default(),
+        &LexOrdering::default(),
         Arc::new(window_frame),
     ));
 
@@ -1014,7 +1015,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
         vec![Arc::new(PlainAggregateWindowExpr::new(
             aggr_expr.clone(),
             &[col("author", &schema)?],
-            LexOrderingRef::default(),
+            &LexOrdering::default(),
             Arc::new(WindowFrame::new(None)),
         ))],
         filter,
@@ -1075,7 +1076,7 @@ fn roundtrip_aggregate_udf_extension_codec() -> 
Result<()> {
         vec![Arc::new(PlainAggregateWindowExpr::new(
             aggr_expr,
             &[col("author", &schema)?],
-            LexOrderingRef::default(),
+            &LexOrdering::default(),
             Arc::new(WindowFrame::new(None)),
         ))],
         filter,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to