This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6a02384d19 Deprecate `PhysicalSortRequirement::from_sort_exprs` and
`PhysicalSortRequirement::to_sort_exprs` (#13222)
6a02384d19 is described below
commit 6a02384d198b3420b96892cc7a72fc202770375c
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Nov 6 06:24:40 2024 -0500
Deprecate `PhysicalSortRequirement::from_sort_exprs` and
`PhysicalSortRequirement::to_sort_exprs` (#13222)
* Deprecate `PhysicalSortRequirement::from_sort_exprs` and
`PhysicalSortRequirement::to_sort_exprs`
* Update for API changes
* fix clippy
* fmt
---
.../src/physical_optimizer/enforce_distribution.rs | 5 +---
.../core/src/physical_optimizer/enforce_sorting.rs | 8 +++---
.../core/src/physical_optimizer/sort_pushdown.rs | 27 ++++++++++----------
.../core/src/physical_optimizer/test_utils.rs | 8 ++----
.../src/physical_optimizer/update_aggr_exprs.rs | 10 +++-----
datafusion/core/src/physical_optimizer/utils.rs | 5 ++--
datafusion/physical-expr-common/src/sort_expr.rs | 29 +++++-----------------
datafusion/physical-expr/src/equivalence/class.rs | 4 +--
.../physical-expr/src/equivalence/properties.rs | 12 ++++-----
.../physical-optimizer/src/output_requirements.rs | 6 ++---
datafusion/physical-plan/src/aggregates/mod.rs | 6 ++---
.../physical-plan/src/joins/sort_merge_join.rs | 10 +++-----
.../physical-plan/src/joins/symmetric_hash_join.rs | 10 ++++----
.../src/sorts/sort_preserving_merge.rs | 5 +---
datafusion/physical-plan/src/windows/mod.rs | 5 ++--
datafusion/proto/src/physical_plan/mod.rs | 10 +++-----
16 files changed, 61 insertions(+), 99 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 6863978610..4bbb995c36 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1423,7 +1423,6 @@ pub(crate) mod tests {
use datafusion_physical_expr::expressions::{BinaryExpr, Literal};
use datafusion_physical_expr::{
expressions::binary, expressions::lit, LexOrdering, PhysicalSortExpr,
- PhysicalSortRequirement,
};
use datafusion_physical_expr_common::sort_expr::LexRequirement;
use datafusion_physical_plan::PlanProperties;
@@ -1496,9 +1495,7 @@ pub(crate) mod tests {
if self.expr.is_empty() {
vec![None]
} else {
- vec![Some(PhysicalSortRequirement::from_sort_exprs(
- self.expr.iter(),
- ))]
+ vec![Some(LexRequirement::from(self.expr.clone()))]
}
}
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index adc3d7cac1..636d52ccc9 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -61,7 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan,
InputOrderMode};
use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_physical_expr::{Partitioning, PhysicalSortRequirement};
+use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
@@ -221,7 +221,7 @@ fn replace_with_partial_sort(
// here we're trying to find the common prefix for sorted columns that
is required for the
// sort and already satisfied by the given ordering
let child_eq_properties = child.equivalence_properties();
- let sort_req =
PhysicalSortRequirement::from_sort_exprs(sort_plan.expr());
+ let sort_req = LexRequirement::from(sort_plan.expr().clone());
let mut common_prefix_length = 0;
while child_eq_properties.ordering_satisfy_requirement(&LexRequirement
{
@@ -275,8 +275,8 @@ fn parallelize_sorts(
{
// Take the initial sort expressions and requirements
let (sort_exprs, fetch) = get_sort_exprs(&requirements.plan)?;
- let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs);
- let sort_exprs = LexOrdering::new(sort_exprs.to_vec());
+ let sort_reqs = LexRequirement::from(sort_exprs.clone());
+ let sort_exprs = sort_exprs.clone();
// If there is a connection between a `CoalescePartitionsExec` and a
// global sort that satisfy the requirements (i.e. intermediate
diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
index e231e719b2..42d682169d 100644
--- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs
@@ -94,7 +94,8 @@ fn pushdown_sorts_helper(
if is_sort(plan) {
let required_ordering = plan
.output_ordering()
- .map(PhysicalSortRequirement::from_sort_exprs)
+ .cloned()
+ .map(LexRequirement::from)
.unwrap_or_default();
if !satisfy_parent {
// Make sure this `SortExec` satisfies parent requirements:
@@ -180,11 +181,12 @@ fn pushdown_requirement_to_children(
RequirementsCompatibility::NonCompatible => Ok(None),
}
} else if let Some(sort_exec) = plan.as_any().downcast_ref::<SortExec>() {
- let sort_req = PhysicalSortRequirement::from_sort_exprs(
+ let sort_req = LexRequirement::from(
sort_exec
.properties()
.output_ordering()
- .unwrap_or(&LexOrdering::default()),
+ .cloned()
+ .unwrap_or(LexOrdering::default()),
);
if sort_exec
.properties()
@@ -205,10 +207,11 @@ fn pushdown_requirement_to_children(
.iter()
.all(|maintain| *maintain)
{
- let output_req = PhysicalSortRequirement::from_sort_exprs(
+ let output_req = LexRequirement::from(
plan.properties()
.output_ordering()
- .unwrap_or(&LexOrdering::default()),
+ .cloned()
+ .unwrap_or(LexOrdering::default()),
);
// Push down through operator with fetch when:
// - requirement is aligned with output ordering
@@ -227,14 +230,12 @@ fn pushdown_requirement_to_children(
} else if is_union(plan) {
// UnionExec does not have real sort requirements for its input. Here
we change the adjusted_request_ordering to UnionExec's output ordering and
// propagate the sort requirements down to correct the unnecessary
descendant SortExec under the UnionExec
- let req = (!parent_required.is_empty())
- .then(|| LexRequirement::new(parent_required.to_vec()));
+ let req = (!parent_required.is_empty()).then(||
parent_required.clone());
Ok(Some(vec![req; plan.children().len()]))
} else if let Some(smj) =
plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left().schema().fields().len();
- let parent_required_expr =
-
PhysicalSortRequirement::to_sort_exprs(parent_required.iter().cloned());
+ let parent_required_expr = LexOrdering::from(parent_required.clone());
match expr_source_side(
parent_required_expr.as_ref(),
smj.join_type(),
@@ -251,8 +252,7 @@ fn pushdown_requirement_to_children(
smj.schema().fields.len() -
smj.right().schema().fields.len();
let new_right_required =
shift_right_required(parent_required, right_offset)?;
- let new_right_required_expr =
- PhysicalSortRequirement::to_sort_exprs(new_right_required);
+ let new_right_required_expr =
LexOrdering::from(new_right_required);
try_pushdown_requirements_to_join(
smj,
parent_required,
@@ -278,8 +278,7 @@ fn pushdown_requirement_to_children(
// Pushing down is not beneficial
Ok(None)
} else if is_sort_preserving_merge(plan) {
- let new_ordering =
- PhysicalSortRequirement::to_sort_exprs(parent_required.to_vec());
+ let new_ordering = LexOrdering::from(parent_required.clone());
let mut spm_eqs = plan.equivalence_properties().clone();
// Sort preserving merge will have new ordering, one requirement above
is pushed down to its below.
spm_eqs = spm_eqs.with_reorder(new_ordering);
@@ -412,7 +411,7 @@ fn try_pushdown_requirements_to_join(
let should_pushdown =
smj_eqs.ordering_satisfy_requirement(parent_required);
Ok(should_pushdown.then(|| {
let mut required_input_ordering = smj.required_input_ordering();
- let new_req =
Some(PhysicalSortRequirement::from_sort_exprs(sort_expr));
+ let new_req = Some(LexRequirement::from(sort_expr.clone()));
match push_side {
JoinSide::Left => {
required_input_ordering[0] = new_req;
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs
b/datafusion/core/src/physical_optimizer/test_utils.rs
index bdf16300ea..88bb0b6fef 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -56,9 +56,7 @@ use datafusion_physical_plan::{
use async_trait::async_trait;
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-use datafusion_physical_expr_common::sort_expr::{
- LexOrdering, LexRequirement, PhysicalSortRequirement,
-};
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
async fn register_current_csv(
ctx: &SessionContext,
@@ -419,9 +417,7 @@ impl ExecutionPlan for RequirementsTestExec {
}
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
- let requirement = PhysicalSortRequirement::from_sort_exprs(
- self.required_input_ordering.as_ref().iter(),
- );
+ let requirement =
LexRequirement::from(self.required_input_ordering.clone());
vec![Some(requirement)]
}
diff --git a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
index c9363b00e1..d563d0c56d 100644
--- a/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
+++ b/datafusion/core/src/physical_optimizer/update_aggr_exprs.rs
@@ -27,7 +27,7 @@ use
datafusion_physical_expr::aggregate::AggregateFunctionExpr;
use datafusion_physical_expr::{
reverse_order_bys, EquivalenceProperties, PhysicalSortRequirement,
};
-use datafusion_physical_expr_common::sort_expr::LexRequirement;
+use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::aggregates::concat_slices;
use datafusion_physical_plan::windows::get_ordered_partition_by_indices;
@@ -139,12 +139,10 @@ fn try_convert_aggregate_if_better(
aggr_exprs
.into_iter()
.map(|aggr_expr| {
- let aggr_sort_exprs =
&aggr_expr.order_bys().cloned().unwrap_or_default();
+ let aggr_sort_exprs =
aggr_expr.order_bys().unwrap_or(LexOrdering::empty());
let reverse_aggr_sort_exprs = reverse_order_bys(aggr_sort_exprs);
- let aggr_sort_reqs =
-
PhysicalSortRequirement::from_sort_exprs(aggr_sort_exprs.iter());
- let reverse_aggr_req =
-
PhysicalSortRequirement::from_sort_exprs(&reverse_aggr_sort_exprs.inner);
+ let aggr_sort_reqs = LexRequirement::from(aggr_sort_exprs.clone());
+ let reverse_aggr_req =
LexRequirement::from(reverse_aggr_sort_exprs);
// If the aggregate expression benefits from input ordering, and
// there is an actual ordering enabling this, try to update the
diff --git a/datafusion/core/src/physical_optimizer/utils.rs
b/datafusion/core/src/physical_optimizer/utils.rs
index 8007d8cc7f..9acd3f67c2 100644
--- a/datafusion/core/src/physical_optimizer/utils.rs
+++ b/datafusion/core/src/physical_optimizer/utils.rs
@@ -27,7 +27,8 @@ use crate::physical_plan::union::UnionExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
-use datafusion_physical_expr::{LexRequirement, PhysicalSortRequirement};
+use datafusion_physical_expr::LexRequirement;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::tree_node::PlanContext;
@@ -38,7 +39,7 @@ pub fn add_sort_above<T: Clone + Default>(
sort_requirements: LexRequirement,
fetch: Option<usize>,
) -> PlanContext<T> {
- let mut sort_expr =
PhysicalSortRequirement::to_sort_exprs(sort_requirements);
+ let mut sort_expr = LexOrdering::from(sort_requirements);
sort_expr.inner.retain(|sort_expr| {
!node
.plan
diff --git a/datafusion/physical-expr-common/src/sort_expr.rs
b/datafusion/physical-expr-common/src/sort_expr.rs
index fc11a6df56..9ae12fa9f6 100644
--- a/datafusion/physical-expr-common/src/sort_expr.rs
+++ b/datafusion/physical-expr-common/src/sort_expr.rs
@@ -296,26 +296,14 @@ impl PhysicalSortRequirement {
})
}
- /// Returns [`PhysicalSortRequirement`] that requires the exact
- /// sort of the [`PhysicalSortExpr`]s in `ordering`
- ///
- /// This method takes `&'a PhysicalSortExpr` to make it easy to
- /// use implementing [`ExecutionPlan::required_input_ordering`].
- ///
- /// [`ExecutionPlan::required_input_ordering`]:
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
+ #[deprecated(since = "43.0.0", note = "use
LexRequirement::from_lex_ordering")]
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> LexRequirement {
let ordering = ordering.into_iter().cloned().collect();
LexRequirement::from_lex_ordering(ordering)
}
-
- /// Converts an iterator of [`PhysicalSortRequirement`] into a Vec
- /// of [`PhysicalSortExpr`]s.
- ///
- /// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
- /// for each entry in the input. If required ordering is None for an entry
- /// default ordering `ASC, NULLS LAST` if given (see the
`PhysicalSortExpr::from`).
+ #[deprecated(since = "43.0.0", note = "use
LexOrdering::from_lex_requirement")]
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> LexOrdering {
@@ -416,8 +404,8 @@ impl LexOrdering {
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see the
`PhysicalSortExpr::from`).
- pub fn from_lex_requirement(requirements: LexRequirement) -> LexOrdering {
- requirements
+ pub fn from_lex_requirement(requirement: LexRequirement) -> LexOrdering {
+ requirement
.into_iter()
.map(PhysicalSortExpr::from)
.collect()
@@ -541,15 +529,10 @@ impl LexRequirement {
self.inner.push(physical_sort_requirement)
}
- /// Create a new [`LexRequirement`] from a vector of [`PhysicalSortExpr`]s.
+ /// Create a new [`LexRequirement`] from a [`LexOrdering`]
///
- /// Returns [`PhysicalSortRequirement`] that requires the exact
+ /// Returns [`LexRequirement`] that requires the exact
/// sort of the [`PhysicalSortExpr`]s in `ordering`
- ///
- /// This method takes `&'a PhysicalSortExpr` to make it easy to
- /// use implementing [`ExecutionPlan::required_input_ordering`].
- ///
- /// [`ExecutionPlan::required_input_ordering`]:
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html#method.required_input_ordering
pub fn from_lex_ordering(ordering: LexOrdering) -> Self {
Self::new(
ordering
diff --git a/datafusion/physical-expr/src/equivalence/class.rs
b/datafusion/physical-expr/src/equivalence/class.rs
index fbd3f3573e..e4185ad44d 100644
--- a/datafusion/physical-expr/src/equivalence/class.rs
+++ b/datafusion/physical-expr/src/equivalence/class.rs
@@ -475,11 +475,11 @@ impl EquivalenceGroup {
/// sort expressions.
pub fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) ->
LexOrdering {
// Convert sort expressions to sort requirements:
- let sort_reqs =
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+ let sort_reqs = LexRequirement::from(sort_exprs.clone());
// Normalize the requirements:
let normalized_sort_reqs =
self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
- PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs.inner)
+ LexOrdering::from(normalized_sort_reqs)
}
/// This function applies the `normalize_sort_requirement` function for all
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 061e77222f..06f1e24ed2 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -409,11 +409,11 @@ impl EquivalenceProperties {
/// after deduplication.
fn normalize_sort_exprs(&self, sort_exprs: &LexOrdering) -> LexOrdering {
// Convert sort expressions to sort requirements:
- let sort_reqs =
PhysicalSortRequirement::from_sort_exprs(sort_exprs.iter());
+ let sort_reqs = LexRequirement::from(sort_exprs.clone());
// Normalize the requirements:
let normalized_sort_reqs =
self.normalize_sort_requirements(&sort_reqs);
// Convert sort requirements back to sort expressions:
- PhysicalSortRequirement::to_sort_exprs(normalized_sort_reqs)
+ LexOrdering::from(normalized_sort_reqs)
}
/// Normalizes the given sort requirements (i.e. `sort_reqs`) using the
@@ -454,7 +454,7 @@ impl EquivalenceProperties {
/// orderings.
pub fn ordering_satisfy(&self, given: &LexOrdering) -> bool {
// Convert the given sort expressions to sort requirements:
- let sort_requirements =
PhysicalSortRequirement::from_sort_exprs(given.iter());
+ let sort_requirements = LexRequirement::from(given.clone());
self.ordering_satisfy_requirement(&sort_requirements)
}
@@ -548,11 +548,11 @@ impl EquivalenceProperties {
rhs: &LexOrdering,
) -> Option<LexOrdering> {
// Convert the given sort expressions to sort requirements:
- let lhs = PhysicalSortRequirement::from_sort_exprs(lhs);
- let rhs = PhysicalSortRequirement::from_sort_exprs(rhs);
+ let lhs = LexRequirement::from(lhs.clone());
+ let rhs = LexRequirement::from(rhs.clone());
let finer = self.get_finer_requirement(&lhs, &rhs);
// Convert the chosen sort requirements back to sort expressions:
- finer.map(PhysicalSortRequirement::to_sort_exprs)
+ finer.map(LexOrdering::from)
}
/// Returns the finer ordering among the requirements `lhs` and `rhs`,
diff --git a/datafusion/physical-optimizer/src/output_requirements.rs
b/datafusion/physical-optimizer/src/output_requirements.rs
index 4f6f91a234..6c8e76bff8 100644
--- a/datafusion/physical-optimizer/src/output_requirements.rs
+++ b/datafusion/physical-optimizer/src/output_requirements.rs
@@ -33,7 +33,7 @@ use datafusion_physical_plan::{
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::{Result, Statistics};
-use datafusion_physical_expr::{Distribution, LexRequirement,
PhysicalSortRequirement};
+use datafusion_physical_expr::{Distribution, LexRequirement};
use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
use datafusion_physical_plan::{ExecutionPlanProperties, PlanProperties};
@@ -256,13 +256,13 @@ fn require_top_ordering_helper(
// Therefore; we check the sort expression field of the SortExec to
assign the requirements.
let req_ordering = sort_exec.expr();
let req_dist = sort_exec.required_input_distribution()[0].clone();
- let reqs = PhysicalSortRequirement::from_sort_exprs(req_ordering);
+ let reqs = LexRequirement::from(req_ordering.clone());
Ok((
Arc::new(OutputRequirementExec::new(plan, Some(reqs), req_dist))
as _,
true,
))
} else if let Some(spm) =
plan.as_any().downcast_ref::<SortPreservingMergeExec>() {
- let reqs = PhysicalSortRequirement::from_sort_exprs(spm.expr());
+ let reqs = LexRequirement::from(spm.expr().clone());
Ok((
Arc::new(OutputRequirementExec::new(
plan,
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index a71039b573..2220007fdd 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1074,9 +1074,7 @@ pub fn get_finer_aggregate_exprs_requirement(
);
}
- Ok(PhysicalSortRequirement::from_sort_exprs(
- requirement.inner.iter(),
- ))
+ Ok(LexRequirement::from(requirement))
}
/// Returns physical expressions for arguments to evaluate against a batch.
@@ -2304,7 +2302,7 @@ mod tests {
&eq_properties,
&AggregateMode::Partial,
)?;
- let res = PhysicalSortRequirement::to_sort_exprs(res);
+ let res = LexOrdering::from(res);
assert_eq!(res, common_requirement);
Ok(())
}
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 2f6dc5fa0b..1eb6ea6329 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -51,7 +51,7 @@ use datafusion_execution::memory_pool::{MemoryConsumer,
MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
-use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
+use datafusion_physical_expr::PhysicalExprRef;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use futures::{Stream, StreamExt};
@@ -297,12 +297,8 @@ impl ExecutionPlan for SortMergeJoinExec {
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
vec![
- Some(PhysicalSortRequirement::from_sort_exprs(
- self.left_sort_exprs.iter(),
- )),
- Some(PhysicalSortRequirement::from_sort_exprs(
- self.right_sort_exprs.iter(),
- )),
+ Some(LexRequirement::from(self.left_sort_exprs.clone())),
+ Some(LexRequirement::from(self.right_sort_exprs.clone())),
]
}
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index 84c9f03b07..94ef4d5bc3 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -70,7 +70,7 @@ use datafusion_execution::TaskContext;
use datafusion_expr::interval_arithmetic::Interval;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;
-use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};
+use datafusion_physical_expr::PhysicalExprRef;
use ahash::RandomState;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
@@ -415,12 +415,12 @@ impl ExecutionPlan for SymmetricHashJoinExec {
vec![
self.left_sort_exprs
.as_ref()
- .map(LexOrdering::iter)
- .map(PhysicalSortRequirement::from_sort_exprs),
+ .cloned()
+ .map(LexRequirement::from),
self.right_sort_exprs
.as_ref()
- .map(LexOrdering::iter)
- .map(PhysicalSortRequirement::from_sort_exprs),
+ .cloned()
+ .map(LexRequirement::from),
]
}
diff --git a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
index 5c80322afe..9a89db9a58 100644
--- a/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
+++ b/datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
@@ -32,7 +32,6 @@ use crate::{
use datafusion_common::{internal_err, Result};
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::TaskContext;
-use datafusion_physical_expr::PhysicalSortRequirement;
use datafusion_physical_expr_common::sort_expr::{LexOrdering, LexRequirement};
use log::{debug, trace};
@@ -203,9 +202,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
}
fn required_input_ordering(&self) -> Vec<Option<LexRequirement>> {
- vec![Some(PhysicalSortRequirement::from_sort_exprs(
- self.expr.iter(),
- ))]
+ vec![Some(LexRequirement::from(self.expr.clone()))]
}
fn maintains_input_order(&self) -> Vec<bool> {
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index da7f6d79e5..24aba011d0 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -516,9 +516,8 @@ pub fn get_window_mode(
// Treat partition by exprs as constant. During analysis of requirements
are satisfied.
let const_exprs = partitionby_exprs.iter().map(ConstExpr::from);
let partition_by_eqs = input_eqs.with_constants(const_exprs);
- let order_by_reqs =
PhysicalSortRequirement::from_sort_exprs(orderby_keys.iter());
- let reverse_order_by_reqs =
-
PhysicalSortRequirement::from_sort_exprs(reverse_order_bys(orderby_keys).iter());
+ let order_by_reqs = LexRequirement::from(orderby_keys.clone());
+ let reverse_order_by_reqs =
LexRequirement::from(reverse_order_bys(orderby_keys));
for (should_swap, order_by_reqs) in
[(false, order_by_reqs), (true, reverse_order_by_reqs)]
{
diff --git a/datafusion/proto/src/physical_plan/mod.rs
b/datafusion/proto/src/physical_plan/mod.rs
index e84eae2b90..64e462d169 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -35,7 +35,7 @@ use datafusion::datasource::physical_plan::{AvroExec,
CsvExec};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::physical_expr::aggregate::AggregateFunctionExpr;
-use datafusion::physical_expr::{LexOrdering, PhysicalExprRef,
PhysicalSortRequirement};
+use datafusion::physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef};
use datafusion::physical_plan::aggregates::AggregateMode;
use datafusion::physical_plan::aggregates::{AggregateExec, PhysicalGroupBy};
use datafusion::physical_plan::analyze::AnalyzeExec;
@@ -1037,7 +1037,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
&sink_schema,
extension_codec,
)
- .map(|item|
PhysicalSortRequirement::from_sort_exprs(&item.inner))
+ .map(LexRequirement::from)
})
.transpose()?;
Ok(Arc::new(DataSinkExec::new(
@@ -1067,7 +1067,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
&sink_schema,
extension_codec,
)
- .map(|item|
PhysicalSortRequirement::from_sort_exprs(&item.inner))
+ .map(LexRequirement::from)
})
.transpose()?;
Ok(Arc::new(DataSinkExec::new(
@@ -1104,9 +1104,7 @@ impl AsExecutionPlan for protobuf::PhysicalPlanNode {
&sink_schema,
extension_codec,
)
- .map(|item| {
-
PhysicalSortRequirement::from_sort_exprs(&item.inner)
- })
+ .map(LexRequirement::from)
})
.transpose()?;
Ok(Arc::new(DataSinkExec::new(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]