This is an automated email from the ASF dual-hosted git repository.
berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 9005585fa6 Deprecate `LexOrderingRef` and `LexRequirementRef` (#13233)
9005585fa6 is described below
commit 9005585fa6f4eb6a4d0cc515b6ad76794c33c626
Author: Jagdish Parihar <[email protected]>
AuthorDate: Tue Nov 5 20:21:36 2024 +0530
Deprecate `LexOrderingRef` and `LexRequirementRef` (#13233)
* converted LexOrderingRef to &LexOrdering
* using LexOrdering::from_ref fn instead of directly cloning it
* using as_ref instread of &
* using as_ref
* removed commented code
* updated cargo lock
* updated LexRequirementRef to &LexRequirement
* fixed clippy issues
* fixed taplo error for cargo.toml in physical-expr-common
* removed commented code
* fixed clippy errors
* fixed clippy error
* fixes
* removed LexOrdering::from_ref instead using clone and created
LexOrdering::empty() fn
* Update mod.rs
---------
Co-authored-by: Berkay Şahin
<[email protected]>
Co-authored-by: berkaysynnada <[email protected]>
---
benchmarks/src/sort.rs | 39 ++++----
datafusion-cli/Cargo.lock | 33 +++----
.../datasource/physical_plan/file_scan_config.rs | 27 +++---
.../src/datasource/physical_plan/statistics.rs | 34 +++----
.../src/physical_optimizer/enforce_distribution.rs | 8 +-
.../core/src/physical_optimizer/enforce_sorting.rs | 17 ++--
.../replace_with_order_preserving_variants.rs | 15 ++-
.../core/src/physical_optimizer/sort_pushdown.rs | 72 ++++++++-------
.../src/physical_optimizer/update_aggr_exprs.rs | 19 ++--
.../core/tests/fuzz_cases/equivalence/utils.rs | 6 +-
.../functions-aggregate-common/src/accumulator.rs | 4 +-
datafusion/functions-aggregate-common/src/utils.rs | 6 +-
datafusion/functions-aggregate/benches/count.rs | 4 +-
datafusion/functions-aggregate/benches/sum.rs | 4 +-
datafusion/functions-aggregate/src/array_agg.rs | 2 +-
datafusion/functions-aggregate/src/first_last.rs | 11 +--
datafusion/functions-aggregate/src/nth_value.rs | 2 +-
datafusion/functions-aggregate/src/stddev.rs | 6 +-
datafusion/physical-expr-common/Cargo.toml | 1 +
datafusion/physical-expr-common/src/sort_expr.rs | 102 +++++++++++++++++----
datafusion/physical-expr-common/src/utils.rs | 4 +-
datafusion/physical-expr/src/aggregate.rs | 9 +-
datafusion/physical-expr/src/equivalence/class.rs | 9 +-
.../physical-expr/src/equivalence/ordering.rs | 7 +-
.../physical-expr/src/equivalence/properties.rs | 34 +++----
datafusion/physical-expr/src/lib.rs | 3 +-
datafusion/physical-expr/src/utils/mod.rs | 4 +-
datafusion/physical-expr/src/window/aggregate.rs | 8 +-
datafusion/physical-expr/src/window/built_in.rs | 8 +-
.../physical-expr/src/window/sliding_aggregate.rs | 8 +-
datafusion/physical-expr/src/window/window_expr.rs | 6 +-
datafusion/physical-plan/src/aggregates/mod.rs | 4 +-
.../physical-plan/src/aggregates/order/mod.rs | 4 +-
.../physical-plan/src/aggregates/order/partial.rs | 4 +-
datafusion/physical-plan/src/execution_plan.rs | 12 +--
.../physical-plan/src/joins/stream_join_utils.rs | 6 +-
.../physical-plan/src/joins/symmetric_hash_join.rs | 12 +--
datafusion/physical-plan/src/joins/utils.rs | 16 ++--
datafusion/physical-plan/src/repartition/mod.rs | 10 +-
datafusion/physical-plan/src/sorts/partial_sort.rs | 3 +-
datafusion/physical-plan/src/sorts/sort.rs | 36 ++++----
.../src/sorts/sort_preserving_merge.rs | 8 +-
datafusion/physical-plan/src/sorts/stream.rs | 4 +-
.../physical-plan/src/sorts/streaming_merge.rs | 22 ++++-
.../src/windows/bounded_window_agg_exec.rs | 7 +-
datafusion/physical-plan/src/windows/mod.rs | 10 +-
datafusion/proto/src/physical_plan/to_proto.rs | 2 +-
.../proto/tests/cases/roundtrip_physical_plan.rs | 27 +++---
48 files changed, 396 insertions(+), 303 deletions(-)
diff --git a/benchmarks/src/sort.rs b/benchmarks/src/sort.rs
index b2038c432f..f4b707611c 100644
--- a/benchmarks/src/sort.rs
+++ b/benchmarks/src/sort.rs
@@ -22,7 +22,7 @@ use crate::util::{AccessLogOpt, BenchmarkRun, CommonOpt};
use arrow::util::pretty;
use datafusion::common::Result;
-use datafusion::physical_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
+use datafusion::physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion::physical_plan::collect;
use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
@@ -70,31 +70,28 @@ impl RunOpt {
let sort_cases = vec![
(
"sort utf8",
- vec![PhysicalSortExpr {
+ LexOrdering::new(vec![PhysicalSortExpr {
expr: col("request_method", &schema)?,
options: Default::default(),
- }],
+ }]),
),
(
"sort int",
- vec![PhysicalSortExpr {
- expr: col("request_bytes", &schema)?,
+ LexOrdering::new(vec![PhysicalSortExpr {
+ expr: col("response_bytes", &schema)?,
options: Default::default(),
- }],
+ }]),
),
(
"sort decimal",
- vec![
- // sort decimal
- PhysicalSortExpr {
- expr: col("decimal_price", &schema)?,
- options: Default::default(),
- },
- ],
+ LexOrdering::new(vec![PhysicalSortExpr {
+ expr: col("decimal_price", &schema)?,
+ options: Default::default(),
+ }]),
),
(
"sort integer tuple",
- vec![
+ LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("request_bytes", &schema)?,
options: Default::default(),
@@ -103,11 +100,11 @@ impl RunOpt {
expr: col("response_bytes", &schema)?,
options: Default::default(),
},
- ],
+ ]),
),
(
"sort utf8 tuple",
- vec![
+ LexOrdering::new(vec![
// sort utf8 tuple
PhysicalSortExpr {
expr: col("service", &schema)?,
@@ -125,11 +122,11 @@ impl RunOpt {
expr: col("image", &schema)?,
options: Default::default(),
},
- ],
+ ]),
),
(
"sort mixed tuple",
- vec![
+ LexOrdering::new(vec![
PhysicalSortExpr {
expr: col("service", &schema)?,
options: Default::default(),
@@ -142,7 +139,7 @@ impl RunOpt {
expr: col("decimal_price", &schema)?,
options: Default::default(),
},
- ],
+ ]),
),
];
for (title, expr) in sort_cases {
@@ -170,13 +167,13 @@ impl RunOpt {
async fn exec_sort(
ctx: &SessionContext,
- expr: LexOrderingRef<'_>,
+ expr: &LexOrdering,
test_file: &TestParquetFile,
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(ctx, None).await?;
- let exec = Arc::new(SortExec::new(LexOrdering::new(expr.to_owned()),
scan));
+ let exec = Arc::new(SortExec::new(expr.clone(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
let elapsed = start.elapsed();
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 541d464d38..3348adb103 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -99,9 +99,9 @@ dependencies = [
[[package]]
name = "anstyle"
-version = "1.0.9"
+version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8365de52b16c035ff4fcafe0092ba9390540e3e352870ac09933bebcaa2c8c56"
+checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
@@ -523,9 +523,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
-version = "1.47.0"
+version = "1.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a8776850becacbd3a82a4737a9375ddb5c6832a51379f24443a98e61513f852c"
+checksum = "ded855583fa1d22e88fe39fd6062b062376e50a8211989e07cf5e38d52eb3453"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -545,9 +545,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ssooidc"
-version = "1.48.0"
+version = "1.49.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0007b5b8004547133319b6c4e87193eee2a0bcb3e4c18c75d09febe9dab7b383"
+checksum = "9177ea1192e6601ae16c7273385690d88a7ed386a00b74a6bc894d12103cd933"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -567,9 +567,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
-version = "1.47.0"
+version = "1.48.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9fffaa356e7f1c725908b75136d53207fa714e348f365671df14e95a60530ad3"
+checksum = "823ef553cf36713c97453e2ddff1eb8f62be7f4523544e2a5db64caf80100f0a"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -917,9 +917,9 @@ dependencies = [
[[package]]
name = "cc"
-version = "1.1.31"
+version = "1.1.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c2e7962b54006dcfcc61cb72735f4d89bb97061dd6a7ed882ec6b8ee53714c6f"
+checksum = "67b9470d453346108f93a59222a9a1a5724db32d0a4727b7ab7ace4b4d822dc9"
dependencies = [
"jobserver",
"libc",
@@ -1520,6 +1520,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr-common",
"hashbrown 0.14.5",
+ "itertools",
"rand",
]
@@ -3614,9 +3615,9 @@ checksum =
"13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
-version = "2.0.85"
+version = "2.0.87"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5023162dfcd14ef8f32034d8bcd4cc5ddc61ef7a247c024a33e24e1f24d21b56"
+checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d"
dependencies = [
"proc-macro2",
"quote",
@@ -3653,18 +3654,18 @@ checksum =
"3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76"
[[package]]
name = "thiserror"
-version = "1.0.65"
+version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "5d11abd9594d9b38965ef50805c5e469ca9cc6f197f883f717e0269a3057b3d5"
+checksum = "5d171f59dbaa811dbbb1aee1e73db92ec2b122911a48e1390dfe327a821ddede"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
-version = "1.0.65"
+version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ae71770322cbd277e69d762a16c444af02aa0575ac0d174f0b9562d3b37f8602"
+checksum = "b08be0f17bd307950653ce45db00cd31200d82b624b36e181337d9c7d92765b5"
dependencies = [
"proc-macro2",
"quote",
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 74ab0126a5..6a162c97b6 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -35,7 +35,6 @@ use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion_common::stats::Precision;
use datafusion_common::{exec_err, ColumnStatistics, DataFusionError,
Statistics};
use datafusion_physical_expr::LexOrdering;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
use log::warn;
@@ -308,7 +307,7 @@ impl FileScanConfig {
pub fn split_groups_by_statistics(
table_schema: &SchemaRef,
file_groups: &[Vec<PartitionedFile>],
- sort_order: LexOrderingRef,
+ sort_order: &LexOrdering,
) -> Result<Vec<Vec<PartitionedFile>>> {
let flattened_files = file_groups.iter().flatten().collect::<Vec<_>>();
// First Fit:
@@ -1113,17 +1112,19 @@ mod tests {
))))
.collect::<Vec<_>>(),
));
- let sort_order = case
- .sort
- .into_iter()
- .map(|expr| {
- crate::physical_planner::create_physical_sort_expr(
- &expr,
- &DFSchema::try_from(table_schema.as_ref().clone())?,
- &ExecutionProps::default(),
- )
- })
- .collect::<Result<Vec<_>>>()?;
+ let sort_order = LexOrdering {
+ inner: case
+ .sort
+ .into_iter()
+ .map(|expr| {
+ crate::physical_planner::create_physical_sort_expr(
+ &expr,
+
&DFSchema::try_from(table_schema.as_ref().clone())?,
+ &ExecutionProps::default(),
+ )
+ })
+ .collect::<Result<Vec<_>>>()?,
+ };
let partitioned_files =
case.files.into_iter().map(From::from).collect::<Vec<_>>();
diff --git a/datafusion/core/src/datasource/physical_plan/statistics.rs
b/datafusion/core/src/datasource/physical_plan/statistics.rs
index 6af153a731..488098e786 100644
--- a/datafusion/core/src/datasource/physical_plan/statistics.rs
+++ b/datafusion/core/src/datasource/physical_plan/statistics.rs
@@ -36,7 +36,7 @@ use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
/// A normalized representation of file min/max statistics that allows for
efficient sorting & comparison.
/// The min/max values are ordered by [`Self::sort_order`].
@@ -50,7 +50,7 @@ pub(crate) struct MinMaxStatistics {
impl MinMaxStatistics {
/// Sort order used to sort the statistics
#[allow(unused)]
- pub fn sort_order(&self) -> LexOrderingRef {
+ pub fn sort_order(&self) -> &LexOrdering {
&self.sort_order
}
@@ -66,8 +66,8 @@ impl MinMaxStatistics {
}
pub fn new_from_files<'a>(
- projected_sort_order: LexOrderingRef, // Sort order with respect to
projected schema
- projected_schema: &SchemaRef, // Projected schema
+ projected_sort_order: &LexOrdering, // Sort order with respect to
projected schema
+ projected_schema: &SchemaRef, // Projected schema
projection: Option<&[usize]>, // Indices of projection in full table
schema (None = all columns)
files: impl IntoIterator<Item = &'a PartitionedFile>,
) -> Result<Self> {
@@ -119,15 +119,17 @@ impl MinMaxStatistics {
projected_schema
.project(&(sort_columns.iter().map(|c|
c.index()).collect::<Vec<_>>()))?,
);
- let min_max_sort_order = sort_columns
- .iter()
- .zip(projected_sort_order.iter())
- .enumerate()
- .map(|(i, (col, sort))| PhysicalSortExpr {
- expr: Arc::new(Column::new(col.name(), i)),
- options: sort.options,
- })
- .collect::<Vec<_>>();
+ let min_max_sort_order = LexOrdering {
+ inner: sort_columns
+ .iter()
+ .zip(projected_sort_order.iter())
+ .enumerate()
+ .map(|(i, (col, sort))| PhysicalSortExpr {
+ expr: Arc::new(Column::new(col.name(), i)),
+ options: sort.options,
+ })
+ .collect::<Vec<_>>(),
+ };
let (min_values, max_values): (Vec<_>, Vec<_>) = sort_columns
.iter()
@@ -167,7 +169,7 @@ impl MinMaxStatistics {
}
pub fn new(
- sort_order: LexOrderingRef,
+ sort_order: &LexOrdering,
schema: &SchemaRef,
min_values: RecordBatch,
max_values: RecordBatch,
@@ -257,7 +259,7 @@ impl MinMaxStatistics {
Ok(Self {
min_by_sort_order: min.map_err(|e| e.context("build min rows"))?,
max_by_sort_order: max.map_err(|e| e.context("build max rows"))?,
- sort_order: LexOrdering::from_ref(sort_order),
+ sort_order: sort_order.clone(),
})
}
@@ -278,7 +280,7 @@ impl MinMaxStatistics {
}
fn sort_columns_from_physical_sort_exprs(
- sort_order: LexOrderingRef,
+ sort_order: &LexOrdering,
) -> Option<Vec<&Column>> {
sort_order
.iter()
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 6cd902db72..6863978610 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -52,12 +52,12 @@ use
datafusion_physical_expr::utils::map_columns_before_projection;
use datafusion_physical_expr::{
physical_exprs_equal, EquivalenceProperties, PhysicalExpr, PhysicalExprRef,
};
-use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_optimizer::output_requirements::OutputRequirementExec;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::windows::{get_best_fitting_window,
BoundedWindowAggExec};
use datafusion_physical_plan::ExecutionPlanProperties;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;
/// The `EnforceDistribution` rule ensures that distribution requirements are
@@ -936,7 +936,11 @@ fn add_spm_on_top(input: DistributionContext) ->
DistributionContext {
let new_plan = if should_preserve_ordering {
Arc::new(SortPreservingMergeExec::new(
-
LexOrdering::from_ref(input.plan.output_ordering().unwrap_or(&[])),
+ input
+ .plan
+ .output_ordering()
+ .unwrap_or(&LexOrdering::default())
+ .clone(),
input.plan.clone(),
)) as _
} else {
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 7b111cddc6..adc3d7cac1 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -62,7 +62,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan,
InputOrderMode};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
@@ -224,9 +224,9 @@ fn replace_with_partial_sort(
let sort_req =
PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
let mut common_prefix_length = 0;
- while child_eq_properties
- .ordering_satisfy_requirement(&sort_req[0..common_prefix_length +
1])
- {
+ while child_eq_properties.ordering_satisfy_requirement(&LexRequirement
{
+ inner: sort_req[0..common_prefix_length + 1].to_vec(),
+ }) {
common_prefix_length += 1;
}
if common_prefix_length > 0 {
@@ -392,7 +392,10 @@ fn analyze_immediate_sort_removal(
let sort_input = sort_exec.input();
// If this sort is unnecessary, we should remove it:
if sort_input.equivalence_properties().ordering_satisfy(
- sort_exec.properties().output_ordering().unwrap_or_default(),
+ sort_exec
+ .properties()
+ .output_ordering()
+ .unwrap_or(LexOrdering::empty()),
) {
node.plan = if !sort_exec.preserve_partitioning()
&& sort_input.output_partitioning().partition_count() > 1
@@ -632,10 +635,10 @@ fn remove_corresponding_sort_from_sub_plan(
Ok(node)
}
-/// Converts an [ExecutionPlan] trait object to a [LexOrderingRef] when
possible.
+/// Converts an [ExecutionPlan] trait object to a [LexOrdering] reference when
possible.
fn get_sort_exprs(
sort_any: &Arc<dyn ExecutionPlan>,
-) -> Result<(LexOrderingRef, Option<usize>)> {
+) -> Result<(&LexOrdering, Option<usize>)> {
if let Some(sort_exec) = sort_any.as_any().downcast_ref::<SortExec>() {
Ok((sort_exec.expr(), sort_exec.fetch()))
} else if let Some(spm) =
sort_any.as_any().downcast_ref::<SortPreservingMergeExec>()
diff --git
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 930ce52e6f..c80aea411f 100644
---
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -129,11 +129,13 @@ fn plan_with_order_preserving_variants(
return Ok(sort_input);
} else if is_coalesce_partitions(&sort_input.plan) && is_spm_better {
let child = &sort_input.children[0].plan;
- if let Some(ordering) = child.output_ordering().map(Vec::from) {
+ if let Some(ordering) = child.output_ordering() {
// When the input of a `CoalescePartitionsExec` has an ordering,
// replace it with a `SortPreservingMergeExec` if appropriate:
- let spm =
- SortPreservingMergeExec::new(LexOrdering::new(ordering),
child.clone());
+ let spm = SortPreservingMergeExec::new(
+ LexOrdering::new(ordering.inner.clone()),
+ child.clone(),
+ );
sort_input.plan = Arc::new(spm) as _;
sort_input.children[0].data = true;
return Ok(sort_input);
@@ -257,7 +259,12 @@ pub(crate) fn replace_with_order_preserving_variants(
if alternate_plan
.plan
.equivalence_properties()
-
.ordering_satisfy(requirements.plan.output_ordering().unwrap_or_default())
+ .ordering_satisfy(
+ requirements
+ .plan
+ .output_ordering()
+ .unwrap_or(LexOrdering::empty()),
+ )
{
for child in alternate_plan.children.iter_mut() {
child.data = false;
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index 1a53077b1f..e231e719b2 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -36,10 +36,8 @@ use datafusion_common::{plan_err, HashSet, JoinSide, Result};
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::collect_columns;
-use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement};
-use datafusion_physical_expr_common::sort_expr::{
- LexOrdering, LexOrderingRef, LexRequirement,
-};
+use datafusion_physical_expr::PhysicalSortRequirement;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
/// This is a "data class" we use within the [`EnforceSorting`] rule to push
/// down [`SortExec`] in the plan. In some cases, we can reduce the total
@@ -87,11 +85,12 @@ fn pushdown_sorts_helper(
let parent_reqs = requirements
.data
.ordering_requirement
- .as_deref()
- .unwrap_or(&[]);
+ .clone()
+ .unwrap_or_default();
let satisfy_parent = plan
.equivalence_properties()
- .ordering_satisfy_requirement(parent_reqs);
+ .ordering_satisfy_requirement(&parent_reqs);
+
if is_sort(plan) {
let required_ordering = plan
.output_ordering()
@@ -139,7 +138,7 @@ fn pushdown_sorts_helper(
for (child, order) in requirements.children.iter_mut().zip(reqs) {
child.data.ordering_requirement = order;
}
- } else if let Some(adjusted) = pushdown_requirement_to_children(plan,
parent_reqs)? {
+ } else if let Some(adjusted) = pushdown_requirement_to_children(plan,
&parent_reqs)? {
// Can not satisfy the parent requirements, check whether we can push
// requirements down:
for (child, order) in requirements.children.iter_mut().zip(adjusted) {
@@ -162,14 +161,16 @@ fn pushdown_sorts_helper(
fn pushdown_requirement_to_children(
plan: &Arc<dyn ExecutionPlan>,
- parent_required: LexRequirementRef,
+ parent_required: &LexRequirement,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let maintains_input_order = plan.maintains_input_order();
if is_window(plan) {
let required_input_ordering = plan.required_input_ordering();
- let request_child =
required_input_ordering[0].as_deref().unwrap_or(&[]);
+ let request_child =
required_input_ordering[0].clone().unwrap_or_default();
let child_plan = plan.children().swap_remove(0);
- match determine_children_requirement(parent_required, request_child,
child_plan) {
+
+ match determine_children_requirement(parent_required, &request_child,
child_plan)
+ {
RequirementsCompatibility::Satisfy => {
let req = (!request_child.is_empty())
.then(|| LexRequirement::new(request_child.to_vec()));
@@ -180,7 +181,10 @@ fn pushdown_requirement_to_children(
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
let sort_req = PhysicalSortRequirement::from_sort_exprs(
- sort_exec.properties().output_ordering().unwrap_or(&[]),
+ sort_exec
+ .properties()
+ .output_ordering()
+ .unwrap_or(&LexOrdering::default()),
);
if sort_exec
.properties()
@@ -202,7 +206,9 @@ fn pushdown_requirement_to_children(
.all(|maintain| *maintain)
{
let output_req = PhysicalSortRequirement::from_sort_exprs(
- plan.properties().output_ordering().unwrap_or(&[]),
+ plan.properties()
+ .output_ordering()
+ .unwrap_or(&LexOrdering::default()),
);
// Push down through operator with fetch when:
// - requirement is aligned with output ordering
@@ -229,7 +235,11 @@ fn pushdown_requirement_to_children(
let left_columns_len = smj.left().schema().fields().len();
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
- match expr_source_side(&parent_required_expr, smj.join_type(),
left_columns_len) {
+ match expr_source_side(
+ parent_required_expr.as_ref(),
+ smj.join_type(),
+ left_columns_len,
+ ) {
Some(JoinSide::Left) => try_pushdown_requirements_to_join(
smj,
parent_required,
@@ -275,7 +285,8 @@ fn pushdown_requirement_to_children(
spm_eqs = spm_eqs.with_reorder(new_ordering);
// Do not push-down through SortPreservingMergeExec when
// ordering requirement invalidates requirement of sort preserving
merge exec.
- if
!spm_eqs.ordering_satisfy(plan.output_ordering().unwrap_or_default()) {
+ if
!spm_eqs.ordering_satisfy(&plan.output_ordering().cloned().unwrap_or_default())
+ {
Ok(None)
} else {
// Can push-down through SortPreservingMergeExec, because parent
requirement is finer
@@ -293,7 +304,7 @@ fn pushdown_requirement_to_children(
/// Return true if pushing the sort requirements through a node would violate
/// the input sorting requirements for the plan
fn pushdown_would_violate_requirements(
- parent_required: LexRequirementRef,
+ parent_required: &LexRequirement,
child: &dyn ExecutionPlan,
) -> bool {
child
@@ -319,8 +330,8 @@ fn pushdown_would_violate_requirements(
/// - If parent requirements are more specific, push down parent requirements.
/// - If they are not compatible, need to add a sort.
fn determine_children_requirement(
- parent_required: LexRequirementRef,
- request_child: LexRequirementRef,
+ parent_required: &LexRequirement,
+ request_child: &LexRequirement,
child_plan: &Arc<dyn ExecutionPlan>,
) -> RequirementsCompatibility {
if child_plan
@@ -345,8 +356,8 @@ fn determine_children_requirement(
fn try_pushdown_requirements_to_join(
smj: &SortMergeJoinExec,
- parent_required: LexRequirementRef,
- sort_expr: LexOrderingRef,
+ parent_required: &LexRequirement,
+ sort_expr: &LexOrdering,
push_side: JoinSide,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
let left_eq_properties = smj.left().equivalence_properties();
@@ -354,13 +365,13 @@ fn try_pushdown_requirements_to_join(
let mut smj_required_orderings = smj.required_input_ordering();
let right_requirement = smj_required_orderings.swap_remove(1);
let left_requirement = smj_required_orderings.swap_remove(0);
- let left_ordering = smj.left().output_ordering().unwrap_or_default();
- let right_ordering = smj.right().output_ordering().unwrap_or_default();
+ let left_ordering =
&smj.left().output_ordering().cloned().unwrap_or_default();
+ let right_ordering =
&smj.right().output_ordering().cloned().unwrap_or_default();
+
let (new_left_ordering, new_right_ordering) = match push_side {
JoinSide::Left => {
- let left_eq_properties = left_eq_properties
- .clone()
- .with_reorder(LexOrdering::from_ref(sort_expr));
+ let left_eq_properties =
+ left_eq_properties.clone().with_reorder(sort_expr.clone());
if left_eq_properties
.ordering_satisfy_requirement(&left_requirement.unwrap_or_default())
{
@@ -371,9 +382,8 @@ fn try_pushdown_requirements_to_join(
}
}
JoinSide::Right => {
- let right_eq_properties = right_eq_properties
- .clone()
- .with_reorder(LexOrdering::from_ref(sort_expr));
+ let right_eq_properties =
+ right_eq_properties.clone().with_reorder(sort_expr.clone());
if right_eq_properties
.ordering_satisfy_requirement(&right_requirement.unwrap_or_default())
{
@@ -417,7 +427,7 @@ fn try_pushdown_requirements_to_join(
}
fn expr_source_side(
- required_exprs: LexOrderingRef,
+ required_exprs: &LexOrdering,
join_type: JoinType,
left_columns_len: usize,
) -> Option<JoinSide> {
@@ -469,7 +479,7 @@ fn expr_source_side(
}
fn shift_right_required(
- parent_required: LexRequirementRef,
+ parent_required: &LexRequirement,
left_columns_len: usize,
) -> Result<LexRequirement> {
let new_right_required = parent_required
@@ -507,7 +517,7 @@ fn shift_right_required(
/// pushed down, `Ok(None)` if not. On error, returns a `Result::Err`.
fn handle_custom_pushdown(
plan: &Arc<dyn ExecutionPlan>,
- parent_required: LexRequirementRef,
+ parent_required: &LexRequirement,
maintains_input_order: Vec<bool>,
) -> Result<Option<Vec<Option<LexRequirement>>>> {
// If there's no requirement from the parent or the plan has no children,
return early
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
index d85278556c..c9363b00e1 100644
--- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -27,6 +27,7 @@ use
datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
@@ -138,7 +139,7 @@ fn try_convert_aggregate_if_better(
aggr_exprs
.into_iter()
.map(|aggr_expr| {
- let aggr_sort_exprs = &aggr_expr.order_bys().unwrap_or_default();
+ let aggr_sort_exprs =
&aggr_expr.order_bys().cloned().unwrap_or_default();
let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
let aggr_sort_reqs =
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
@@ -151,14 +152,20 @@ fn try_convert_aggregate_if_better(
// Otherwise, leave it as is.
if aggr_expr.order_sensitivity().is_beneficial() &&
!aggr_sort_reqs.is_empty()
{
- let reqs = concat_slices(prefix_requirement, &aggr_sort_reqs);
+ let reqs = LexRequirement {
+ inner: concat_slices(prefix_requirement, &aggr_sort_reqs),
+ };
+
+ let prefix_requirement = LexRequirement {
+ inner: prefix_requirement.to_vec(),
+ };
+
if eq_properties.ordering_satisfy_requirement(&reqs) {
// Existing ordering satisfies the aggregator requirements:
aggr_expr.with_beneficial_ordering(true)?.map(Arc::new)
- } else if
eq_properties.ordering_satisfy_requirement(&concat_slices(
- prefix_requirement,
- &reverse_aggr_req,
- )) {
+ } else if
eq_properties.ordering_satisfy_requirement(&LexRequirement {
+ inner: concat_slices(&prefix_requirement,
&reverse_aggr_req),
+ }) {
// Converting to reverse enables more efficient execution
// given the existing ordering (if possible):
aggr_expr
diff --git a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
index 35da8b5963..921332bca5 100644
--- a/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
+++ b/datafusion/core/tests/fuzz_cases/equivalence/utils.rs
@@ -32,7 +32,7 @@ use datafusion_expr::sort_properties::{ExprProperties,
SortProperties};
use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility};
use datafusion_physical_expr::equivalence::{EquivalenceClass,
ProjectionMapping};
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::izip;
use rand::prelude::*;
@@ -465,7 +465,7 @@ pub fn generate_table_for_orderings(
// prune out rows that is invalid according to remaining orderings.
for ordering in orderings.iter().skip(1) {
- let sort_columns = get_sort_columns(&batch, ordering.as_ref())?;
+ let sort_columns = get_sort_columns(&batch, ordering)?;
// Collect sort options and values into separate vectors.
let (sort_options, sort_col_values): (Vec<_>, Vec<_>) = sort_columns
@@ -530,7 +530,7 @@ fn generate_random_f64_array(
// Helper function to get sort columns from a batch
fn get_sort_columns(
batch: &RecordBatch,
- ordering: LexOrderingRef,
+ ordering: &LexOrdering,
) -> Result<Vec<SortColumn>> {
ordering
.iter()
diff --git a/datafusion/functions-aggregate-common/src/accumulator.rs
b/datafusion/functions-aggregate-common/src/accumulator.rs
index 67ada56280..a230bb0289 100644
--- a/datafusion/functions-aggregate-common/src/accumulator.rs
+++ b/datafusion/functions-aggregate-common/src/accumulator.rs
@@ -19,7 +19,7 @@ use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::Result;
use datafusion_expr_common::accumulator::Accumulator;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::sync::Arc;
/// [`AccumulatorArgs`] contains information about how an aggregate
@@ -52,7 +52,7 @@ pub struct AccumulatorArgs<'a> {
/// ```
///
/// If no `ORDER BY` is specified, `ordering_req` will be empty.
- pub ordering_req: LexOrderingRef<'a>,
+ pub ordering_req: &'a LexOrdering,
/// Whether the aggregation is running in reverse order
pub is_reversed: bool,
diff --git a/datafusion/functions-aggregate-common/src/utils.rs
b/datafusion/functions-aggregate-common/src/utils.rs
index f55e5ec9a4..e440abe2de 100644
--- a/datafusion/functions-aggregate-common/src/utils.rs
+++ b/datafusion/functions-aggregate-common/src/utils.rs
@@ -30,7 +30,7 @@ use arrow::{
};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr_common::accumulator::Accumulator;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
/// Convert scalar values from an accumulator into arrays.
pub fn get_accum_scalar_values_as_arrays(
@@ -88,7 +88,7 @@ pub fn adjust_output_array(data_type: &DataType, array:
ArrayRef) -> Result<Arra
/// Construct corresponding fields for lexicographical ordering requirement
expression
pub fn ordering_fields(
- ordering_req: LexOrderingRef,
+ ordering_req: &LexOrdering,
// Data type of each expression in the ordering requirement
data_types: &[DataType],
) -> Vec<Field> {
@@ -107,7 +107,7 @@ pub fn ordering_fields(
}
/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
-pub fn get_sort_options(ordering_req: LexOrderingRef) -> Vec<SortOptions> {
+pub fn get_sort_options(ordering_req: &LexOrdering) -> Vec<SortOptions> {
ordering_req.iter().map(|item| item.options).collect()
}
diff --git a/datafusion/functions-aggregate/benches/count.rs
b/datafusion/functions-aggregate/benches/count.rs
index 1c8266ed5b..e6b62e6e18 100644
--- a/datafusion/functions-aggregate/benches/count.rs
+++ b/datafusion/functions-aggregate/benches/count.rs
@@ -23,7 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main,
Criterion};
use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl,
GroupsAccumulator};
use datafusion_functions_aggregate::count::Count;
use datafusion_physical_expr::expressions::col;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::sync::Arc;
fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
@@ -32,7 +32,7 @@ fn prepare_accumulator() -> Box<dyn GroupsAccumulator> {
return_type: &DataType::Int64,
schema: &schema,
ignore_nulls: false,
- ordering_req: LexOrderingRef::default(),
+ ordering_req: &LexOrdering::default(),
is_reversed: false,
name: "COUNT(f)",
is_distinct: false,
diff --git a/datafusion/functions-aggregate/benches/sum.rs
b/datafusion/functions-aggregate/benches/sum.rs
index 1e9493280e..1c180126a3 100644
--- a/datafusion/functions-aggregate/benches/sum.rs
+++ b/datafusion/functions-aggregate/benches/sum.rs
@@ -23,7 +23,7 @@ use criterion::{black_box, criterion_group, criterion_main,
Criterion};
use datafusion_expr::{function::AccumulatorArgs, AggregateUDFImpl,
GroupsAccumulator};
use datafusion_functions_aggregate::sum::Sum;
use datafusion_physical_expr::expressions::col;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::sync::Arc;
fn prepare_accumulator(data_type: &DataType) -> Box<dyn GroupsAccumulator> {
@@ -32,7 +32,7 @@ fn prepare_accumulator(data_type: &DataType) -> Box<dyn
GroupsAccumulator> {
return_type: data_type,
schema: &schema,
ignore_nulls: false,
- ordering_req: LexOrderingRef::default(),
+ ordering_req: &LexOrdering::default(),
is_reversed: false,
name: "SUM(f)",
is_distinct: false,
diff --git a/datafusion/functions-aggregate/src/array_agg.rs
b/datafusion/functions-aggregate/src/array_agg.rs
index 7c22c21e38..252a07cb11 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -135,7 +135,7 @@ impl AggregateUDFImpl for ArrayAgg {
OrderSensitiveArrayAggAccumulator::try_new(
&data_type,
&ordering_dtypes,
- LexOrdering::from_ref(acc_args.ordering_req),
+ acc_args.ordering_req.clone(),
acc_args.is_reversed,
)
.map(|acc| Box::new(acc) as _)
diff --git a/datafusion/functions-aggregate/src/first_last.rs
b/datafusion/functions-aggregate/src/first_last.rs
index 0b05713499..3ca1422668 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -37,7 +37,7 @@ use datafusion_expr::{
ExprFunctionExt, Signature, SortExpr, TypeSignature, Volatility,
};
use datafusion_functions_aggregate_common::utils::get_sort_options;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
create_func!(FirstValue, first_value_udaf);
@@ -130,7 +130,7 @@ impl AggregateUDFImpl for FirstValue {
FirstValueAccumulator::try_new(
acc_args.return_type,
&ordering_dtypes,
- LexOrdering::from_ref(acc_args.ordering_req),
+ acc_args.ordering_req.clone(),
acc_args.ignore_nulls,
)
.map(|acc|
Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
@@ -455,7 +455,7 @@ impl AggregateUDFImpl for LastValue {
LastValueAccumulator::try_new(
acc_args.return_type,
&ordering_dtypes,
- LexOrdering::from_ref(acc_args.ordering_req),
+ acc_args.ordering_req.clone(),
acc_args.ignore_nulls,
)
.map(|acc|
Box::new(acc.with_requirement_satisfied(requirement_satisfied)) as _)
@@ -723,10 +723,7 @@ fn filter_states_according_to_is_set(
}
/// Combines array refs and their corresponding orderings to construct
`SortColumn`s.
-fn convert_to_sort_cols(
- arrs: &[ArrayRef],
- sort_exprs: LexOrderingRef,
-) -> Vec<SortColumn> {
+fn convert_to_sort_cols(arrs: &[ArrayRef], sort_exprs: &LexOrdering) ->
Vec<SortColumn> {
arrs.iter()
.zip(sort_exprs.iter())
.map(|(item, sort_expr)| SortColumn {
diff --git a/datafusion/functions-aggregate/src/nth_value.rs
b/datafusion/functions-aggregate/src/nth_value.rs
index 5f3a8cf2f1..f3e892fa73 100644
--- a/datafusion/functions-aggregate/src/nth_value.rs
+++ b/datafusion/functions-aggregate/src/nth_value.rs
@@ -133,7 +133,7 @@ impl AggregateUDFImpl for NthValueAgg {
n,
&data_type,
&ordering_dtypes,
- LexOrdering::from_ref(acc_args.ordering_req),
+ acc_args.ordering_req.clone(),
)
.map(|acc| Box::new(acc) as _)
}
diff --git a/datafusion/functions-aggregate/src/stddev.rs
b/datafusion/functions-aggregate/src/stddev.rs
index 95269ed821..b785d8e985 100644
--- a/datafusion/functions-aggregate/src/stddev.rs
+++ b/datafusion/functions-aggregate/src/stddev.rs
@@ -410,7 +410,7 @@ mod tests {
use datafusion_expr::AggregateUDF;
use
datafusion_functions_aggregate_common::utils::get_accum_scalar_values_as_arrays;
use datafusion_physical_expr::expressions::col;
- use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+ use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::sync::Arc;
#[test]
@@ -462,7 +462,7 @@ mod tests {
return_type: &DataType::Float64,
schema,
ignore_nulls: false,
- ordering_req: LexOrderingRef::default(),
+ ordering_req: &LexOrdering::default(),
name: "a",
is_distinct: false,
is_reversed: false,
@@ -473,7 +473,7 @@ mod tests {
return_type: &DataType::Float64,
schema,
ignore_nulls: false,
- ordering_req: LexOrderingRef::default(),
+ ordering_req: &LexOrdering::default(),
name: "a",
is_distinct: false,
is_reversed: false,
diff --git a/datafusion/physical-expr-common/Cargo.toml
b/datafusion/physical-expr-common/Cargo.toml
index 45ccb08e52..ad27c9d49c 100644
--- a/datafusion/physical-expr-common/Cargo.toml
+++ b/datafusion/physical-expr-common/Cargo.toml
@@ -41,4 +41,5 @@ arrow = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr-common = { workspace = true }
hashbrown = { workspace = true }
+itertools = { workspace = true }
rand = { workspace = true }
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs
b/datafusion/physical-expr-common/src/sort_expr.rs
index addf2fbfca..f91d583215 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -22,7 +22,7 @@ use std::fmt;
use std::fmt::{Display, Formatter};
use std::hash::{Hash, Hasher};
use std::ops::{Deref, Index, Range, RangeFrom, RangeTo};
-use std::sync::Arc;
+use std::sync::{Arc, OnceLock};
use std::vec::IntoIter;
use arrow::compute::kernels::sort::{SortColumn, SortOptions};
@@ -30,6 +30,7 @@ use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use datafusion_common::Result;
use datafusion_expr_common::columnar_value::ColumnarValue;
+use itertools::Itertools;
/// Represents Sort operation for a column in a RecordBatch
///
@@ -218,7 +219,7 @@ impl From<PhysicalSortRequirement> for PhysicalSortExpr {
/// If options is `None`, the default sort options `ASC, NULLS LAST` is
used.
///
/// The default is picked to be consistent with
- /// PostgreSQL:
<https://www.postgresql.org/docs/current/queries-order.html>
+ /// PostgreSQL:
<https://www.postgresql.org/docs/current/queries-order.html>
fn from(value: PhysicalSortRequirement) -> Self {
let options = value.options.unwrap_or(SortOptions {
descending: false,
@@ -309,13 +310,8 @@ impl PhysicalSortRequirement {
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
- LexRequirement::new(
- ordering
- .into_iter()
- .cloned()
- .map(PhysicalSortRequirement::from)
- .collect(),
- )
+ let ordering = ordering.into_iter().cloned().collect();
+ LexRequirement::from_lex_ordering(ordering)
}
/// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
@@ -327,10 +323,8 @@ impl PhysicalSortRequirement {
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> LexOrdering {
- requirements
- .into_iter()
- .map(PhysicalSortExpr::from)
- .collect()
+ let requirements = requirements.into_iter().collect();
+ LexOrdering::from_lex_requirement(requirements)
}
}
@@ -352,14 +346,23 @@ pub struct LexOrdering {
pub inner: Vec<PhysicalSortExpr>,
}
+impl AsRef<LexOrdering> for LexOrdering {
+ fn as_ref(&self) -> &LexOrdering {
+ self
+ }
+}
+
+static EMPTY_ORDER: OnceLock<LexOrdering> = OnceLock::new();
+
impl LexOrdering {
// Creates a new [`LexOrdering`] from a vector
pub fn new(inner: Vec<PhysicalSortExpr>) -> Self {
Self { inner }
}
- pub fn as_ref(&self) -> LexOrderingRef {
- &self.inner
+ /// Return an empty LexOrdering (no expressions)
+ pub fn empty() -> &'static LexOrdering {
+ EMPTY_ORDER.get_or_init(LexOrdering::default)
}
pub fn capacity(&self) -> usize {
@@ -378,10 +381,6 @@ impl LexOrdering {
self.inner.extend(iter)
}
- pub fn from_ref(lex_ordering_ref: LexOrderingRef) -> Self {
- Self::new(lex_ordering_ref.to_vec())
- }
-
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
@@ -409,6 +408,36 @@ impl LexOrdering {
pub fn truncate(&mut self, len: usize) {
self.inner.truncate(len)
}
+
+ /// Merge the contents of `other` into `self`, removing duplicates.
+ pub fn merge(mut self, other: LexOrdering) -> Self {
+ self.inner = self.inner.into_iter().chain(other).unique().collect();
+ self
+ }
+
+ /// Converts a `LexRequirement` into a `LexOrdering`.
+ ///
+ /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
+ /// for each entry in the input. If required ordering is None for an entry
+ /// default ordering `ASC, NULLS LAST` if given (see the
`PhysicalSortExpr::from`).
+ pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering {
+ requirements
+ .into_iter()
+ .map(PhysicalSortExpr::from)
+ .collect()
+ }
+}
+
+impl From<Vec<PhysicalSortExpr>> for LexOrdering {
+ fn from(value: Vec<PhysicalSortExpr>) -> Self {
+ Self::new(value)
+ }
+}
+
+impl From<LexRequirement> for LexOrdering {
+ fn from(value: LexRequirement) -> Self {
+ Self::from_lex_requirement(value)
+ }
}
impl Deref for LexOrdering {
@@ -489,6 +518,7 @@ impl IntoIterator for LexOrdering {
///`LexOrderingRef` is an alias for the type &`[PhysicalSortExpr]`, which
represents
/// a reference to a lexicographical ordering.
+#[deprecated(since = "43.0.0", note = "use &LexOrdering instead")]
pub type LexOrderingRef<'a> = &'a [PhysicalSortExpr];
///`LexRequirement` is an struct containing a `Vec<PhysicalSortRequirement>`,
which
@@ -514,6 +544,30 @@ impl LexRequirement {
pub fn push(&mut self, physical_sort_requirement: PhysicalSortRequirement)
{
self.inner.push(physical_sort_requirement)
}
+
+ /// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s.
+ ///
+ /// Returns [`PhysicalSortRequirement`] that requires the exact
+ /// sort of the [`PhysicalSortExpr`]s in `ordering`
+ ///
+ /// This method takes `&'a PhysicalSortExpr` to make it easy to
+ /// use implementing [`ExecutionPlan::required_input_ordering`].
+ ///
+ /// [`ExecutionPlan::required_input_ordering`]:
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
+ pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
+ Self::new(
+ ordering
+ .into_iter()
+ .map(PhysicalSortRequirement::from)
+ .collect(),
+ )
+ }
+}
+
+impl From<LexOrdering> for LexRequirement {
+ fn from(value: LexOrdering) -> Self {
+ Self::from_lex_ordering(value)
+ }
}
impl Deref for LexRequirement {
@@ -545,6 +599,16 @@ impl IntoIterator for LexRequirement {
}
}
+impl<'a> IntoIterator for &'a LexOrdering {
+ type Item = &'a PhysicalSortExpr;
+ type IntoIter = std::slice::Iter<'a, PhysicalSortExpr>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ self.inner.iter()
+ }
+}
+
///`LexRequirementRef` is an alias for the type &`[PhysicalSortRequirement]`,
which
/// represents a reference to a lexicographical ordering requirement.
+/// #[deprecated(since = "43.0.0", note = "use &LexRequirement instead")]
pub type LexRequirementRef<'a> = &'a [PhysicalSortRequirement];
diff --git a/datafusion/physical-expr-common/src/utils.rs
b/datafusion/physical-expr-common/src/utils.rs
index 26293b1a76..ffdab6c6d3 100644
--- a/datafusion/physical-expr-common/src/utils.rs
+++ b/datafusion/physical-expr-common/src/utils.rs
@@ -24,7 +24,7 @@ use datafusion_common::Result;
use datafusion_expr_common::sort_properties::ExprProperties;
use crate::physical_expr::PhysicalExpr;
-use crate::sort_expr::{LexOrdering, LexOrderingRef, PhysicalSortExpr};
+use crate::sort_expr::{LexOrdering, PhysicalSortExpr};
use crate::tree_node::ExprContext;
/// Represents a [`PhysicalExpr`] node with associated properties (order and
@@ -96,7 +96,7 @@ pub fn scatter(mask: &BooleanArray, truthy: &dyn Array) ->
Result<ArrayRef> {
/// Reverses the ORDER BY expression, which is useful during equivalent window
/// expression construction. For instance, 'ORDER BY a ASC, NULLS LAST' turns
into
/// 'ORDER BY a DESC, NULLS FIRST'.
-pub fn reverse_order_bys(order_bys: LexOrderingRef) -> LexOrdering {
+pub fn reverse_order_bys(order_bys: &LexOrdering) -> LexOrdering {
order_bys
.iter()
.map(|e| PhysicalSortExpr::new(e.expr.clone(), !e.options))
diff --git a/datafusion/physical-expr/src/aggregate.rs
b/datafusion/physical-expr/src/aggregate.rs
index e446776aff..5dc1389334 100644
--- a/datafusion/physical-expr/src/aggregate.rs
+++ b/datafusion/physical-expr/src/aggregate.rs
@@ -45,7 +45,7 @@ use
datafusion_functions_aggregate_common::accumulator::AccumulatorArgs;
use datafusion_functions_aggregate_common::accumulator::StateFieldsArgs;
use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_expr_common::utils::reverse_order_bys;
use datafusion_expr_common::groups_accumulator::GroupsAccumulator;
@@ -292,7 +292,7 @@ impl AggregateFunctionExpr {
/// Order by requirements for the aggregate function
/// By default it is `None` (there is no requirement)
/// Order-sensitive aggregators, such as `FIRST_VALUE(x ORDER BY y)`
should implement this
- pub fn order_bys(&self) -> Option<LexOrderingRef> {
+ pub fn order_bys(&self) -> Option<&LexOrdering> {
if self.ordering_req.is_empty() {
return None;
}
@@ -490,7 +490,10 @@ impl AggregateFunctionExpr {
/// These expressions are (1)function arguments, (2) order by expressions.
pub fn all_expressions(&self) -> AggregatePhysicalExpressions {
let args = self.expressions();
- let order_bys = self.order_bys().unwrap_or_default();
+ let order_bys = self
+ .order_bys()
+ .cloned()
+ .unwrap_or_else(LexOrdering::default);
let order_by_exprs = order_bys
.iter()
.map(|sort_expr| Arc::clone(&sort_expr.expr))
diff --git a/datafusion/physical-expr/src/equivalence/class.rs
b/datafusion/physical-expr/src/equivalence/class.rs
index 7305bc1b0a..35ff6f685b 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -21,9 +21,8 @@ use std::sync::Arc;
use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
use crate::{
expressions::Column, physical_expr::deduplicate_physical_exprs,
- physical_exprs_bag_equal, physical_exprs_contains, LexOrdering,
LexOrderingRef,
- LexRequirement, LexRequirementRef, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr,
- PhysicalSortRequirement,
+ physical_exprs_bag_equal, physical_exprs_contains, LexOrdering,
LexRequirement,
+ PhysicalExpr, PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement,
};
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -475,7 +474,7 @@ impl EquivalenceGroup {
/// This function applies the `normalize_sort_expr` function for all sort
/// expressions in `sort_exprs` and returns the corresponding normalized
/// sort expressions.
- pub fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) ->
LexOrdering {
+ pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) ->
LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs =
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
// Normalize the requirements:
@@ -489,7 +488,7 @@ impl EquivalenceGroup {
/// sort requirements.
pub fn normalize_sort_requirements(
&self,
- sort_reqs: LexRequirementRef,
+ sort_reqs: &LexRequirement,
) -> LexRequirement {
collapse_lex_req(LexRequirement::new(
sort_reqs
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 838c9800f9..06f85b657e 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -146,12 +146,7 @@ impl OrderingEquivalenceClass {
/// Returns the concatenation of all the orderings. This enables merge
/// operations to preserve all equivalent orderings simultaneously.
pub fn output_ordering(&self) -> Option<LexOrdering> {
- let output_ordering = self
- .orderings
- .iter()
- .flat_map(|ordering| ordering.as_ref())
- .cloned()
- .collect();
+ let output_ordering =
self.orderings.iter().flatten().cloned().collect();
let output_ordering = collapse_lex_ordering(output_ordering);
(!output_ordering.is_empty()).then_some(output_ordering)
}
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 55c99e93d0..061e77222f 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -30,9 +30,8 @@ use crate::equivalence::{
};
use crate::expressions::{with_new_schema, CastExpr, Column, Literal};
use crate::{
- physical_exprs_contains, ConstExpr, LexOrdering, LexOrderingRef,
LexRequirement,
- LexRequirementRef, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
- PhysicalSortRequirement,
+ physical_exprs_contains, ConstExpr, LexOrdering, LexRequirement,
PhysicalExpr,
+ PhysicalExprRef, PhysicalSortExpr, PhysicalSortRequirement,
};
use arrow_schema::{SchemaRef, SortOptions};
@@ -197,7 +196,7 @@ impl EquivalenceProperties {
OrderingEquivalenceClass::new(
self.oeq_class
.iter()
- .map(|ordering| self.normalize_sort_exprs(ordering.as_ref()))
+ .map(|ordering| self.normalize_sort_exprs(ordering))
.collect(),
)
}
@@ -408,7 +407,7 @@ impl EquivalenceProperties {
/// function would return `vec![a ASC, c ASC]`. Internally, it would first
/// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final
result
/// after deduplication.
- fn normalize_sort_exprs(&self, sort_exprs: LexOrderingRef) -> LexOrdering {
+ fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
let sort_reqs =
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
// Normalize the requirements:
@@ -430,10 +429,7 @@ impl EquivalenceProperties {
/// function would return `vec![a ASC, c ASC]`. Internally, it would first
/// normalize to `vec![a ASC, c ASC, a ASC]` and end up with the final
result
/// after deduplication.
- fn normalize_sort_requirements(
- &self,
- sort_reqs: LexRequirementRef,
- ) -> LexRequirement {
+ fn normalize_sort_requirements(&self, sort_reqs: &LexRequirement) ->
LexRequirement {
let normalized_sort_reqs =
self.eq_group.normalize_sort_requirements(sort_reqs);
let mut constant_exprs = vec![];
constant_exprs.extend(
@@ -456,7 +452,7 @@ impl EquivalenceProperties {
/// Checks whether the given ordering is satisfied by any of the existing
/// orderings.
- pub fn ordering_satisfy(&self, given: LexOrderingRef) -> bool {
+ pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
// Convert the given sort expressions to sort requirements:
let sort_requirements =
PhysicalSortRequirement::from_sort_exprs(given.iter());
self.ordering_satisfy_requirement(&sort_requirements)
@@ -464,7 +460,7 @@ impl EquivalenceProperties {
/// Checks whether the given sort requirements are satisfied by any of the
/// existing orderings.
- pub fn ordering_satisfy_requirement(&self, reqs: LexRequirementRef) ->
bool {
+ pub fn ordering_satisfy_requirement(&self, reqs: &LexRequirement) -> bool {
let mut eq_properties = self.clone();
// First, standardize the given requirement:
let normalized_reqs = eq_properties.normalize_sort_requirements(reqs);
@@ -525,8 +521,8 @@ impl EquivalenceProperties {
/// than the `reference` sort requirements.
pub fn requirements_compatible(
&self,
- given: LexRequirementRef,
- reference: LexRequirementRef,
+ given: &LexRequirement,
+ reference: &LexRequirement,
) -> bool {
let normalized_given = self.normalize_sort_requirements(given);
let normalized_reference = self.normalize_sort_requirements(reference);
@@ -548,8 +544,8 @@ impl EquivalenceProperties {
/// the latter.
pub fn get_finer_ordering(
&self,
- lhs: LexOrderingRef,
- rhs: LexOrderingRef,
+ lhs: &LexOrdering,
+ rhs: &LexOrdering,
) -> Option<LexOrdering> {
// Convert the given sort expressions to sort requirements:
let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
@@ -569,8 +565,8 @@ impl EquivalenceProperties {
/// is the latter.
pub fn get_finer_requirement(
&self,
- req1: LexRequirementRef,
- req2: LexRequirementRef,
+ req1: &LexRequirement,
+ req2: &LexRequirement,
) -> Option<LexRequirement> {
let mut lhs = self.normalize_sort_requirements(req1);
let mut rhs = self.normalize_sort_requirements(req2);
@@ -606,7 +602,7 @@ impl EquivalenceProperties {
pub fn substitute_ordering_component(
&self,
mapping: &ProjectionMapping,
- sort_expr: LexOrderingRef,
+ sort_expr: &LexOrdering,
) -> Result<Vec<LexOrdering>> {
let new_orderings = sort_expr
.iter()
@@ -656,7 +652,7 @@ impl EquivalenceProperties {
let orderings = &self.oeq_class.orderings;
let new_order = orderings
.iter()
- .map(|order| self.substitute_ordering_component(mapping,
order.as_ref()))
+ .map(|order| self.substitute_ordering_component(mapping, order))
.collect::<Result<Vec<_>>>()?;
let new_order = new_order.into_iter().flatten().collect();
self.oeq_class = OrderingEquivalenceClass::new(new_order);
diff --git a/datafusion/physical-expr/src/lib.rs
b/datafusion/physical-expr/src/lib.rs
index e7c2b4119c..405b6bbd69 100644
--- a/datafusion/physical-expr/src/lib.rs
+++ b/datafusion/physical-expr/src/lib.rs
@@ -54,8 +54,7 @@ pub use physical_expr::{
pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
pub use datafusion_physical_expr_common::sort_expr::{
- LexOrdering, LexOrderingRef, LexRequirement, LexRequirementRef,
PhysicalSortExpr,
- PhysicalSortRequirement,
+ LexOrdering, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement,
};
pub use planner::{create_physical_expr, create_physical_exprs};
diff --git a/datafusion/physical-expr/src/utils/mod.rs
b/datafusion/physical-expr/src/utils/mod.rs
index 73d744b4b6..30cfecf0e2 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -33,7 +33,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{HashMap, HashSet, Result};
use datafusion_expr::Operator;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use itertools::Itertools;
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableGraph;
@@ -244,7 +244,7 @@ pub fn reassign_predicate_columns(
}
/// Merge left and right sort expressions, checking for duplicates.
-pub fn merge_vectors(left: LexOrderingRef, right: LexOrderingRef) ->
LexOrdering {
+pub fn merge_vectors(left: &LexOrdering, right: &LexOrdering) -> LexOrdering {
left.iter()
.cloned()
.chain(right.iter().cloned())
diff --git a/datafusion/physical-expr/src/window/aggregate.rs
b/datafusion/physical-expr/src/window/aggregate.rs
index 94960c95e4..0c56bdc929 100644
--- a/datafusion/physical-expr/src/window/aggregate.rs
+++ b/datafusion/physical-expr/src/window/aggregate.rs
@@ -34,7 +34,7 @@ use crate::{reverse_order_bys, PhysicalExpr};
use datafusion_common::ScalarValue;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{Accumulator, WindowFrame};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
/// A window expr that takes the form of an aggregate function.
///
@@ -52,13 +52,13 @@ impl PlainAggregateWindowExpr {
pub fn new(
aggregate: Arc<AggregateFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
- order_by: LexOrderingRef,
+ order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
) -> Self {
Self {
aggregate,
partition_by: partition_by.to_vec(),
- order_by: LexOrdering::from_ref(order_by),
+ order_by: order_by.clone(),
window_frame,
}
}
@@ -124,7 +124,7 @@ impl WindowExpr for PlainAggregateWindowExpr {
&self.partition_by
}
- fn order_by(&self) -> LexOrderingRef {
+ fn order_by(&self) -> &LexOrdering {
self.order_by.as_ref()
}
diff --git a/datafusion/physical-expr/src/window/built_in.rs
b/datafusion/physical-expr/src/window/built_in.rs
index 5f6c5e5c2c..0f6c3f9218 100644
--- a/datafusion/physical-expr/src/window/built_in.rs
+++ b/datafusion/physical-expr/src/window/built_in.rs
@@ -33,7 +33,7 @@ use datafusion_common::utils::evaluate_partition_ranges;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::window_state::{WindowAggState, WindowFrameContext};
use datafusion_expr::WindowFrame;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
/// A window expr that takes the form of a [`BuiltInWindowFunctionExpr`].
#[derive(Debug)]
@@ -49,13 +49,13 @@ impl BuiltInWindowExpr {
pub fn new(
expr: Arc<dyn BuiltInWindowFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
- order_by: LexOrderingRef,
+ order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
) -> Self {
Self {
expr,
partition_by: partition_by.to_vec(),
- order_by: LexOrdering::from_ref(order_by),
+ order_by: order_by.clone(),
window_frame,
}
}
@@ -118,7 +118,7 @@ impl WindowExpr for BuiltInWindowExpr {
&self.partition_by
}
- fn order_by(&self) -> LexOrderingRef {
+ fn order_by(&self) -> &LexOrdering {
self.order_by.as_ref()
}
diff --git a/datafusion/physical-expr/src/window/sliding_aggregate.rs
b/datafusion/physical-expr/src/window/sliding_aggregate.rs
index 1e46baae7b..572eb8866a 100644
--- a/datafusion/physical-expr/src/window/sliding_aggregate.rs
+++ b/datafusion/physical-expr/src/window/sliding_aggregate.rs
@@ -33,7 +33,7 @@ use crate::window::{
use crate::{expressions::PhysicalSortExpr, reverse_order_bys, PhysicalExpr};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{Accumulator, WindowFrame};
-use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexOrderingRef};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
/// A window expr that takes the form of an aggregate function that
/// can be incrementally computed over sliding windows.
@@ -52,13 +52,13 @@ impl SlidingAggregateWindowExpr {
pub fn new(
aggregate: Arc<AggregateFunctionExpr>,
partition_by: &[Arc<dyn PhysicalExpr>],
- order_by: LexOrderingRef,
+ order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
) -> Self {
Self {
aggregate,
partition_by: partition_by.to_vec(),
- order_by: LexOrdering::from_ref(order_by),
+ order_by: order_by.clone(),
window_frame,
}
}
@@ -108,7 +108,7 @@ impl WindowExpr for SlidingAggregateWindowExpr {
&self.partition_by
}
- fn order_by(&self) -> LexOrderingRef {
+ fn order_by(&self) -> &LexOrdering {
self.order_by.as_ref()
}
diff --git a/datafusion/physical-expr/src/window/window_expr.rs
b/datafusion/physical-expr/src/window/window_expr.rs
index 0f882def44..828e5ad206 100644
--- a/datafusion/physical-expr/src/window/window_expr.rs
+++ b/datafusion/physical-expr/src/window/window_expr.rs
@@ -20,7 +20,7 @@ use std::fmt::Debug;
use std::ops::Range;
use std::sync::Arc;
-use crate::{LexOrderingRef, PhysicalExpr};
+use crate::{LexOrdering, PhysicalExpr};
use arrow::array::{new_empty_array, Array, ArrayRef};
use arrow::compute::kernels::sort::SortColumn;
@@ -109,7 +109,7 @@ pub trait WindowExpr: Send + Sync + Debug {
fn partition_by(&self) -> &[Arc<dyn PhysicalExpr>];
/// Expressions that's from the window function's order by clause, empty
if absent
- fn order_by(&self) -> LexOrderingRef;
+ fn order_by(&self) -> &LexOrdering;
/// Get order by columns, empty if absent
fn order_by_columns(&self, batch: &RecordBatch) -> Result<Vec<SortColumn>>
{
@@ -344,7 +344,7 @@ pub(crate) fn is_end_bound_safe(
window_frame_ctx: &WindowFrameContext,
order_bys: &[ArrayRef],
most_recent_order_bys: Option<&[ArrayRef]>,
- sort_exprs: LexOrderingRef,
+ sort_exprs: &LexOrdering,
idx: usize,
) -> Result<bool> {
if sort_exprs.is_empty() {
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index 5ffe797c5c..a71039b573 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -940,7 +940,7 @@ fn get_aggregate_expr_req(
return LexOrdering::default();
}
- let mut req =
LexOrdering::from_ref(aggr_expr.order_bys().unwrap_or_default());
+ let mut req = aggr_expr.order_bys().cloned().unwrap_or_default();
// In non-first stage modes, we accumulate data (using `merge_batch`) from
// different partitions (i.e. merge partial results). During this merge, we
@@ -983,7 +983,7 @@ fn finer_ordering(
agg_mode: &AggregateMode,
) -> Option<LexOrdering> {
let aggr_req = get_aggregate_expr_req(aggr_expr, group_by, agg_mode);
- eq_properties.get_finer_ordering(existing_req.as_ref(), aggr_req.as_ref())
+ eq_properties.get_finer_ordering(existing_req, aggr_req.as_ref())
}
/// Concatenates the given slices.
diff --git a/datafusion/physical-plan/src/aggregates/order/mod.rs
b/datafusion/physical-plan/src/aggregates/order/mod.rs
index 24846d2395..7d9a50e20a 100644
--- a/datafusion/physical-plan/src/aggregates/order/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/order/mod.rs
@@ -19,7 +19,7 @@ use arrow_array::ArrayRef;
use arrow_schema::Schema;
use datafusion_common::Result;
use datafusion_expr::EmitTo;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::mem::size_of;
mod full;
@@ -45,7 +45,7 @@ impl GroupOrdering {
pub fn try_new(
input_schema: &Schema,
mode: &InputOrderMode,
- ordering: LexOrderingRef,
+ ordering: &LexOrdering,
) -> Result<Self> {
match mode {
InputOrderMode::Linear => Ok(GroupOrdering::None),
diff --git a/datafusion/physical-plan/src/aggregates/order/partial.rs
b/datafusion/physical-plan/src/aggregates/order/partial.rs
index 5cc55dc0d0..5a05b88798 100644
--- a/datafusion/physical-plan/src/aggregates/order/partial.rs
+++ b/datafusion/physical-plan/src/aggregates/order/partial.rs
@@ -21,7 +21,7 @@ use arrow_schema::Schema;
use datafusion_common::Result;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_expr::EmitTo;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use std::mem::size_of;
use std::sync::Arc;
@@ -107,7 +107,7 @@ impl GroupOrderingPartial {
pub fn try_new(
input_schema: &Schema,
order_indices: &[usize],
- ordering: LexOrderingRef,
+ ordering: &LexOrdering,
) -> Result<Self> {
assert!(!order_indices.is_empty());
assert!(order_indices.len() <= ordering.len());
diff --git a/datafusion/physical-plan/src/execution_plan.rs
b/datafusion/physical-plan/src/execution_plan.rs
index d65320dbab..7220e7594e 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -38,7 +38,7 @@ pub use datafusion_physical_expr::{
expressions, udf, Distribution, Partitioning, PhysicalExpr,
};
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
-use datafusion_physical_expr_common::sort_expr::{LexOrderingRef,
LexRequirement};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::display::DisplayableExecutionPlan;
@@ -443,7 +443,7 @@ pub trait ExecutionPlanProperties {
/// For example, `SortExec` (obviously) produces sorted output as does
/// `SortPreservingMergeStream`. Less obviously, `Projection` produces
sorted
/// output if its input is sorted as it does not reorder the input rows.
- fn output_ordering(&self) -> Option<LexOrderingRef>;
+ fn output_ordering(&self) -> Option<&LexOrdering>;
/// Get the [`EquivalenceProperties`] within the plan.
///
@@ -474,7 +474,7 @@ impl ExecutionPlanProperties for Arc<dyn ExecutionPlan> {
self.properties().execution_mode()
}
- fn output_ordering(&self) -> Option<LexOrderingRef> {
+ fn output_ordering(&self) -> Option<&LexOrdering> {
self.properties().output_ordering()
}
@@ -492,7 +492,7 @@ impl ExecutionPlanProperties for &dyn ExecutionPlan {
self.properties().execution_mode()
}
- fn output_ordering(&self) -> Option<LexOrderingRef> {
+ fn output_ordering(&self) -> Option<&LexOrdering> {
self.properties().output_ordering()
}
@@ -643,8 +643,8 @@ impl PlanProperties {
&self.partitioning
}
- pub fn output_ordering(&self) -> Option<LexOrderingRef> {
- self.output_ordering.as_deref()
+ pub fn output_ordering(&self) -> Option<&LexOrdering> {
+ self.output_ordering.as_ref()
}
pub fn execution_mode(&self) -> ExecutionMode {
diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs
b/datafusion/physical-plan/src/joins/stream_join_utils.rs
index f08ce0ea2f..cea04ccad3 100644
--- a/datafusion/physical-plan/src/joins/stream_join_utils.rs
+++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs
@@ -40,7 +40,7 @@ use
datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::utils::collect_columns;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use hashbrown::raw::RawTable;
/// Implementation of `JoinHashMapType` for `PruningJoinHashMap`.
@@ -744,8 +744,8 @@ pub fn prepare_sorted_exprs(
filter: &JoinFilter,
left: &Arc<dyn ExecutionPlan>,
right: &Arc<dyn ExecutionPlan>,
- left_sort_exprs: LexOrderingRef,
- right_sort_exprs: LexOrderingRef,
+ left_sort_exprs: &LexOrdering,
+ right_sort_exprs: &LexOrdering,
) -> Result<(SortedFilterExpr, SortedFilterExpr, ExprIntervalGraph)> {
let err = || {
datafusion_common::plan_datafusion_err!("Filter does not include the
child order")
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index f082bdbdd3..84c9f03b07 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -73,9 +73,7 @@ use
datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
use ahash::RandomState;
-use datafusion_physical_expr_common::sort_expr::{
- LexOrdering, LexOrderingRef, LexRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{ready, Stream, StreamExt};
use parking_lot::Mutex;
@@ -319,13 +317,13 @@ impl SymmetricHashJoinExec {
}
/// Get left_sort_exprs
- pub fn left_sort_exprs(&self) -> Option<LexOrderingRef> {
- self.left_sort_exprs.as_deref()
+ pub fn left_sort_exprs(&self) -> Option<&LexOrdering> {
+ self.left_sort_exprs.as_ref()
}
/// Get right_sort_exprs
- pub fn right_sort_exprs(&self) -> Option<LexOrderingRef> {
- self.right_sort_exprs.as_deref()
+ pub fn right_sort_exprs(&self) -> Option<&LexOrdering> {
+ self.right_sort_exprs.as_ref()
}
/// Check if order information covers every column in the filter
expression.
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index a257119a8b..0366c9fa5e 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -51,7 +51,7 @@ use datafusion_physical_expr::equivalence::add_offset_to_expr;
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::utils::{collect_columns, merge_vectors};
use datafusion_physical_expr::{
- LexOrdering, LexOrderingRef, PhysicalExpr, PhysicalExprRef,
PhysicalSortExpr,
+ LexOrdering, PhysicalExpr, PhysicalExprRef, PhysicalSortExpr,
};
use futures::future::{BoxFuture, Shared};
@@ -469,7 +469,7 @@ fn replace_on_columns_of_right_ordering(
}
fn offset_ordering(
- ordering: LexOrderingRef,
+ ordering: &LexOrdering,
join_type: &JoinType,
offset: usize,
) -> LexOrdering {
@@ -483,14 +483,14 @@ fn offset_ordering(
options: sort_expr.options,
})
.collect(),
- _ => LexOrdering::from_ref(ordering),
+ _ => ordering.clone(),
}
}
/// Calculate the output ordering of a given join operation.
pub fn calculate_join_output_ordering(
- left_ordering: LexOrderingRef,
- right_ordering: LexOrderingRef,
+ left_ordering: &LexOrdering,
+ right_ordering: &LexOrdering,
join_type: JoinType,
on_columns: &[(PhysicalExprRef, PhysicalExprRef)],
left_columns_len: usize,
@@ -503,7 +503,7 @@ pub fn calculate_join_output_ordering(
if join_type == JoinType::Inner && probe_side ==
Some(JoinSide::Left) {
replace_on_columns_of_right_ordering(
on_columns,
- &mut LexOrdering::from_ref(right_ordering),
+ &mut right_ordering.clone(),
)
.ok()?;
merge_vectors(
@@ -512,7 +512,7 @@ pub fn calculate_join_output_ordering(
.as_ref(),
)
} else {
- LexOrdering::from_ref(left_ordering)
+ left_ordering.clone()
}
}
[false, true] => {
@@ -520,7 +520,7 @@ pub fn calculate_join_output_ordering(
if join_type == JoinType::Inner && probe_side ==
Some(JoinSide::Right) {
replace_on_columns_of_right_ordering(
on_columns,
- &mut LexOrdering::from_ref(right_ordering),
+ &mut right_ordering.clone(),
)
.ok()?;
merge_vectors(
diff --git a/datafusion/physical-plan/src/repartition/mod.rs
b/datafusion/physical-plan/src/repartition/mod.rs
index 4d0dbc75d4..1730c7d8dc 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -29,6 +29,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet,
MetricBuilder, MetricsSet};
use super::{
DisplayAs, ExecutionPlanProperties, RecordBatchStream,
SendableRecordBatchStream,
};
+use crate::execution_plan::CardinalityEffect;
use crate::hash_utils::create_hashes;
use crate::metrics::BaselineMetrics;
use crate::repartition::distributor_channels::{
@@ -48,10 +49,9 @@ use datafusion_common_runtime::SpawnedTask;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{EquivalenceProperties, PhysicalExpr};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
-use crate::execution_plan::CardinalityEffect;
use datafusion_common::HashMap;
-use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
use futures::stream::Stream;
use futures::{FutureExt, StreamExt, TryStreamExt};
use log::trace;
@@ -503,7 +503,7 @@ impl DisplayAs for RepartitionExec {
}
if let Some(sort_exprs) = self.sort_exprs() {
- write!(f, ", sort_exprs={}",
LexOrdering::from_ref(sort_exprs))?;
+ write!(f, ", sort_exprs={}", sort_exprs.clone())?;
}
Ok(())
}
@@ -572,7 +572,7 @@ impl ExecutionPlan for RepartitionExec {
let schema_captured = Arc::clone(&schema);
// Get existing ordering to use for merging
- let sort_exprs = self.sort_exprs().unwrap_or(&[]).to_owned();
+ let sort_exprs = self.sort_exprs().cloned().unwrap_or_default();
let stream = futures::stream::once(async move {
let num_input_partitions =
input.output_partitioning().partition_count();
@@ -756,7 +756,7 @@ impl RepartitionExec {
}
/// Return the sort expressions that are used to merge
- fn sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
+ fn sort_exprs(&self) -> Option<&LexOrdering> {
if self.preserve_order {
self.input.output_ordering()
} else {
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index 8f853464c9..e69989c1be 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -72,7 +72,6 @@ use datafusion_common::Result;
use datafusion_execution::{RecordBatchStream, TaskContext};
use datafusion_physical_expr::LexOrdering;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
use futures::{ready, Stream, StreamExt};
use log::trace;
@@ -159,7 +158,7 @@ impl PartialSortExec {
}
/// Sort expressions
- pub fn expr(&self) -> LexOrderingRef {
+ pub fn expr(&self) -> &LexOrdering {
self.expr.as_ref()
}
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index d90d0f64ce..ce7efce415 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -52,9 +52,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer,
MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;
-use datafusion_physical_expr_common::sort_expr::{
- LexOrderingRef, PhysicalSortRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
use crate::execution_plan::CardinalityEffect;
use futures::{StreamExt, TryStreamExt};
@@ -344,10 +342,12 @@ impl ExternalSorter {
streams.push(stream);
}
+ let expressions: LexOrdering = self.expr.iter().cloned().collect();
+
StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(Arc::clone(&self.schema))
- .with_expressions(self.expr.to_vec().as_slice())
+ .with_expressions(expressions.as_ref())
.with_metrics(self.metrics.baseline.clone())
.with_batch_size(self.batch_size)
.with_fetch(self.fetch)
@@ -536,10 +536,12 @@ impl ExternalSorter {
})
.collect::<Result<_>>()?;
+ let expressions: LexOrdering = self.expr.iter().cloned().collect();
+
StreamingMergeBuilder::new()
.with_streams(streams)
.with_schema(Arc::clone(&self.schema))
- .with_expressions(self.expr.as_ref())
+ .with_expressions(expressions.as_ref())
.with_metrics(metrics)
.with_batch_size(self.batch_size)
.with_fetch(self.fetch)
@@ -561,7 +563,7 @@ impl ExternalSorter {
let schema = batch.schema();
let fetch = self.fetch;
- let expressions = Arc::clone(&self.expr);
+ let expressions: LexOrdering = self.expr.iter().cloned().collect();
let stream = futures::stream::once(futures::future::lazy(move |_| {
let timer = metrics.elapsed_compute().timer();
let sorted = sort_batch(&batch, &expressions, fetch)?;
@@ -603,7 +605,7 @@ impl Debug for ExternalSorter {
pub fn sort_batch(
batch: &RecordBatch,
- expressions: LexOrderingRef,
+ expressions: &LexOrdering,
fetch: Option<usize>,
) -> Result<RecordBatch> {
let sort_columns = expressions
@@ -762,8 +764,8 @@ impl SortExec {
}
/// Sort expressions
- pub fn expr(&self) -> LexOrderingRef {
- self.expr.as_ref()
+ pub fn expr(&self) -> &LexOrdering {
+ &self.expr
}
/// If `Some(fetch)`, limits output to only the first "fetch" items
@@ -790,11 +792,10 @@ impl SortExec {
preserve_partitioning: bool,
) -> PlanProperties {
// Determine execution mode:
- let sort_satisfied =
input.equivalence_properties().ordering_satisfy_requirement(
- PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter())
- .inner
- .as_slice(),
- );
+ let requirement = LexRequirement::from(sort_exprs);
+ let sort_satisfied = input
+ .equivalence_properties()
+ .ordering_satisfy_requirement(&requirement);
let mode = match input.execution_mode() {
ExecutionMode::Unbounded if sort_satisfied =>
ExecutionMode::Unbounded,
ExecutionMode::Bounded => ExecutionMode::Bounded,
@@ -803,6 +804,7 @@ impl SortExec {
// Calculate equivalence properties; i.e. reset the ordering
equivalence
// class with the new ordering:
+ let sort_exprs = LexOrdering::from(requirement);
let eq_properties = input
.equivalence_properties()
.clone()
@@ -890,11 +892,7 @@ impl ExecutionPlan for SortExec {
let sort_satisfied = self
.input
.equivalence_properties()
- .ordering_satisfy_requirement(
- PhysicalSortRequirement::from_sort_exprs(self.expr.iter())
- .inner
- .as_slice(),
- );
+
.ordering_satisfy_requirement(&LexRequirement::from(self.expr.clone()));
match (sort_satisfied, self.fetch.as_ref()) {
(true, Some(fetch)) => Ok(Box::pin(LimitStream::new(
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 9ee0faaa0a..5c80322afe 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -34,9 +34,7 @@ use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::PhysicalSortRequirement;
-use datafusion_physical_expr_common::sort_expr::{
- LexOrdering, LexOrderingRef, LexRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use log::{debug, trace};
/// Sort preserving merge execution plan
@@ -122,8 +120,8 @@ impl SortPreservingMergeExec {
}
/// Sort expressions
- pub fn expr(&self) -> LexOrderingRef {
- &self.expr
+ pub fn expr(&self) -> &LexOrdering {
+ self.expr.as_ref()
}
/// Fetch
diff --git a/datafusion/physical-plan/src/sorts/stream.rs
b/datafusion/physical-plan/src/sorts/stream.rs
index 70beb2c4a9..7c57fdf9ba 100644
--- a/datafusion/physical-plan/src/sorts/stream.rs
+++ b/datafusion/physical-plan/src/sorts/stream.rs
@@ -24,7 +24,7 @@ use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use datafusion_common::Result;
use datafusion_execution::memory_pool::MemoryReservation;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use futures::stream::{Fuse, StreamExt};
use std::marker::PhantomData;
use std::sync::Arc;
@@ -93,7 +93,7 @@ pub struct RowCursorStream {
impl RowCursorStream {
pub fn try_new(
schema: &Schema,
- expressions: LexOrderingRef,
+ expressions: &LexOrdering,
streams: Vec<SendableRecordBatchStream>,
reservation: MemoryReservation,
) -> Result<Self> {
diff --git a/datafusion/physical-plan/src/sorts/streaming_merge.rs
b/datafusion/physical-plan/src/sorts/streaming_merge.rs
index bd74685eac..4350235ef4 100644
--- a/datafusion/physical-plan/src/sorts/streaming_merge.rs
+++ b/datafusion/physical-plan/src/sorts/streaming_merge.rs
@@ -28,7 +28,7 @@ use arrow::datatypes::{DataType, SchemaRef};
use arrow_array::*;
use datafusion_common::{internal_err, Result};
use datafusion_execution::memory_pool::MemoryReservation;
-use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
macro_rules! primitive_merge_helper {
($t:ty, $($v:ident),+) => {
@@ -51,11 +51,10 @@ macro_rules! merge_helper {
}};
}
-#[derive(Default)]
pub struct StreamingMergeBuilder<'a> {
streams: Vec<SendableRecordBatchStream>,
schema: Option<SchemaRef>,
- expressions: LexOrderingRef<'a>,
+ expressions: &'a LexOrdering,
metrics: Option<BaselineMetrics>,
batch_size: Option<usize>,
fetch: Option<usize>,
@@ -63,6 +62,21 @@ pub struct StreamingMergeBuilder<'a> {
enable_round_robin_tie_breaker: bool,
}
+impl<'a> Default for StreamingMergeBuilder<'a> {
+ fn default() -> Self {
+ Self {
+ streams: vec![],
+ schema: None,
+ expressions: LexOrdering::empty(),
+ metrics: None,
+ batch_size: None,
+ fetch: None,
+ reservation: None,
+ enable_round_robin_tie_breaker: false,
+ }
+ }
+}
+
impl<'a> StreamingMergeBuilder<'a> {
pub fn new() -> Self {
Self {
@@ -81,7 +95,7 @@ impl<'a> StreamingMergeBuilder<'a> {
self
}
- pub fn with_expressions(mut self, expressions: LexOrderingRef<'a>) -> Self
{
+ pub fn with_expressions(mut self, expressions: &'a LexOrdering) -> Self {
self.expressions = expressions;
self
}
diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
index c3e0a4e389..602efa54f8 100644
--- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
+++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs
@@ -1188,7 +1188,6 @@ mod tests {
};
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
- use datafusion_physical_expr_common::sort_expr::LexOrderingRef;
use futures::future::Shared;
use futures::{pin_mut, ready, FutureExt, Stream, StreamExt};
use itertools::Itertools;
@@ -1555,7 +1554,7 @@ mod tests {
Arc::new(BuiltInWindowExpr::new(
last_value_func,
&[],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
@@ -1566,7 +1565,7 @@ mod tests {
Arc::new(BuiltInWindowExpr::new(
nth_value_func1,
&[],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
@@ -1577,7 +1576,7 @@ mod tests {
Arc::new(BuiltInWindowExpr::new(
nth_value_func2,
&[],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new_bounds(
WindowFrameUnits::Rows,
WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 217823fb6a..da7f6d79e5 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -53,7 +53,7 @@ use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
};
-use datafusion_physical_expr_common::sort_expr::{LexOrderingRef,
LexRequirement};
+use datafusion_physical_expr_common::sort_expr::LexRequirement;
pub use window_agg_exec::WindowAggExec;
/// Build field from window function and add it into schema
@@ -98,7 +98,7 @@ pub fn create_window_expr(
name: String,
args: &[Arc<dyn PhysicalExpr>],
partition_by: &[Arc<dyn PhysicalExpr>],
- order_by: LexOrderingRef,
+ order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
input_schema: &Schema,
ignore_nulls: bool,
@@ -139,7 +139,7 @@ pub fn create_window_expr(
/// Creates an appropriate [`WindowExpr`] based on the window frame and
fn window_expr_from_aggregate_expr(
partition_by: &[Arc<dyn PhysicalExpr>],
- order_by: LexOrderingRef,
+ order_by: &LexOrdering,
window_frame: Arc<WindowFrame>,
aggregate: Arc<AggregateFunctionExpr>,
) -> Arc<dyn WindowExpr> {
@@ -497,7 +497,7 @@ pub fn get_best_fitting_window(
/// the mode this window operator should work in to accommodate the existing
ordering.
pub fn get_window_mode(
partitionby_exprs: &[Arc<dyn PhysicalExpr>],
- orderby_keys: LexOrderingRef,
+ orderby_keys: &LexOrdering,
input: &Arc<dyn ExecutionPlan>,
) -> Option<(bool, InputOrderMode)> {
let input_eqs = input.equivalence_properties().clone();
@@ -699,7 +699,7 @@ mod tests {
"count".to_owned(),
&[col("a", &schema)?],
&[],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new(None)),
schema.as_ref(),
false,
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 4bf7e35332..dc94ad075c 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -53,7 +53,7 @@ pub fn serialize_physical_aggr_expr(
) -> Result<protobuf::PhysicalExprNode> {
let expressions = serialize_physical_exprs(&aggr_expr.expressions(),
codec)?;
let ordering_req = match aggr_expr.order_bys() {
- Some(order) => LexOrdering::from_ref(order),
+ Some(order) => order.clone(),
None => LexOrdering::default(),
};
let ordering_req = serialize_physical_sort_exprs(ordering_req, codec)?;
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 1e078ee410..8c8dcccee3 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -52,8 +52,7 @@ use datafusion::logical_expr::{create_udf, JoinType,
Operator, Volatility};
use datafusion::physical_expr::expressions::Literal;
use datafusion::physical_expr::window::SlidingAggregateWindowExpr;
use datafusion::physical_expr::{
- LexOrdering, LexOrderingRef, LexRequirement, PhysicalSortRequirement,
- ScalarFunctionExpr,
+ LexOrdering, LexRequirement, PhysicalSortRequirement, ScalarFunctionExpr,
};
use datafusion::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
@@ -288,13 +287,15 @@ fn roundtrip_window() -> Result<()> {
false,
)),
&[col("b", &schema)?],
- &[PhysicalSortExpr {
- expr: col("a", &schema)?,
- options: SortOptions {
- descending: false,
- nulls_first: false,
- },
- }],
+ &LexOrdering{
+ inner: vec![PhysicalSortExpr {
+ expr: col("a", &schema)?,
+ options: SortOptions {
+ descending: false,
+ nulls_first: false,
+ },
+ }]
+ },
Arc::new(window_frame),
));
@@ -308,7 +309,7 @@ fn roundtrip_window() -> Result<()> {
.build()
.map(Arc::new)?,
&[],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new(None)),
));
@@ -328,7 +329,7 @@ fn roundtrip_window() -> Result<()> {
let sliding_aggr_window_expr = Arc::new(SlidingAggregateWindowExpr::new(
sum_expr,
&[],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(window_frame),
));
@@ -1014,7 +1015,7 @@ fn roundtrip_scalar_udf_extension_codec() -> Result<()> {
vec![Arc::new(PlainAggregateWindowExpr::new(
aggr_expr.clone(),
&[col("author", &schema)?],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new(None)),
))],
filter,
@@ -1075,7 +1076,7 @@ fn roundtrip_aggregate_udf_extension_codec() ->
Result<()> {
vec![Arc::new(PlainAggregateWindowExpr::new(
aggr_expr,
&[col("author", &schema)?],
- LexOrderingRef::default(),
+ &LexOrdering::default(),
Arc::new(WindowFrame::new(None)),
))],
filter,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]