This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 701abf72b1 Prune out constant expressions from output ordering. (#9947)
701abf72b1 is described below
commit 701abf72b118d2379c401de3bd12d3ce977d1bae
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Apr 5 09:18:37 2024 +0300
Prune out constant expressions from output ordering. (#9947)
* Initial commit
* Add new unit test
* Make requirement explicit
* Minor changes
* Minor changes
---
.../src/physical_optimizer/enforce_distribution.rs | 96 ++++++++++++++--------
.../physical-expr/src/equivalence/properties.rs | 10 +++
datafusion/physical-plan/src/lib.rs | 4 +-
3 files changed, 76 insertions(+), 34 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a58f8698d6..145f08af76 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1327,11 +1327,6 @@ pub(crate) mod tests {
}
impl SortRequiredExec {
- fn new(input: Arc<dyn ExecutionPlan>) -> Self {
- let expr = input.output_ordering().unwrap_or(&[]).to_vec();
- Self::new_with_requirement(input, expr)
- }
-
fn new_with_requirement(
input: Arc<dyn ExecutionPlan>,
requirement: Vec<PhysicalSortExpr>,
@@ -1391,10 +1386,11 @@ pub(crate) mod tests {
// model that it requires the output ordering of its input
fn required_input_ordering(&self) ->
Vec<Option<Vec<PhysicalSortRequirement>>> {
- vec![self
- .properties()
- .output_ordering()
- .map(PhysicalSortRequirement::from_sort_exprs)]
+ if self.expr.is_empty() {
+ vec![None]
+ } else {
+
vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))]
+ }
}
fn with_new_children(
@@ -1677,10 +1673,6 @@ pub(crate) mod tests {
Arc::new(UnionExec::new(input))
}
- fn sort_required_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn
ExecutionPlan> {
- Arc::new(SortRequiredExec::new(input))
- }
-
fn sort_required_exec_with_req(
input: Arc<dyn ExecutionPlan>,
sort_exprs: LexOrdering,
@@ -3206,8 +3198,10 @@ pub(crate) mod tests {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
- let plan =
- sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(),
false)));
+ let plan = sort_required_exec_with_req(
+ filter_exec(sort_exec(sort_key.clone(), parquet_exec(), false)),
+ sort_key,
+ );
let expected = &[
"SortRequiredExec: [c@2 ASC]",
@@ -3367,18 +3361,20 @@ pub(crate) mod tests {
// Parquet(sorted)
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
- expr: col("c", &schema).unwrap(),
+ expr: col("d", &schema).unwrap(),
options: SortOptions::default(),
}];
- let plan =
-
sort_required_exec(filter_exec(parquet_exec_with_sort(vec![sort_key])));
+ let plan = sort_required_exec_with_req(
+ filter_exec(parquet_exec_with_sort(vec![sort_key.clone()])),
+ sort_key,
+ );
// during repartitioning ordering is preserved
let expected = &[
- "SortRequiredExec: [c@2 ASC]",
+ "SortRequiredExec: [d@3 ASC]",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=1",
- "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[c@2 ASC]",
+ "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c,
d, e], output_ordering=[d@3 ASC]",
];
assert_optimized!(expected, plan.clone(), true, true);
@@ -3403,7 +3399,10 @@ pub(crate) mod tests {
expr: col("c", &schema).unwrap(),
options: SortOptions::default(),
}];
- let input1 =
sort_required_exec(parquet_exec_with_sort(vec![sort_key]));
+ let input1 = sort_required_exec_with_req(
+ parquet_exec_with_sort(vec![sort_key.clone()]),
+ sort_key,
+ );
let input2 = filter_exec(parquet_exec());
let plan = union_exec(vec![input1, input2]);
@@ -3481,10 +3480,13 @@ pub(crate) mod tests {
("c".to_string(), "c".to_string()),
];
// sorted input
- let plan = sort_required_exec(projection_exec_with_alias(
- parquet_exec_multiple_sorted(vec![sort_key]),
- alias,
- ));
+ let plan = sort_required_exec_with_req(
+ projection_exec_with_alias(
+ parquet_exec_multiple_sorted(vec![sort_key.clone()]),
+ alias,
+ ),
+ sort_key,
+ );
let expected = &[
"SortRequiredExec: [c@2 ASC]",
@@ -3639,8 +3641,8 @@ pub(crate) mod tests {
options: SortOptions::default(),
}];
- let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key]));
- let plan = sort_required_exec(plan);
+ let plan =
filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
+ let plan = sort_required_exec_with_req(plan, sort_key);
// The groups must have only contiguous ranges of rows from the same
file
// if any group has rows from multiple files, the data is no longer
sorted destroyed
@@ -4025,9 +4027,14 @@ pub(crate) mod tests {
}];
// SortRequired
// Parquet(sorted)
- let plan_parquet =
- sort_required_exec(parquet_exec_with_sort(vec![sort_key.clone()]));
- let plan_csv = sort_required_exec(csv_exec_with_sort(vec![sort_key]));
+ let plan_parquet = sort_required_exec_with_req(
+ parquet_exec_with_sort(vec![sort_key.clone()]),
+ sort_key.clone(),
+ );
+ let plan_csv = sort_required_exec_with_req(
+ csv_exec_with_sort(vec![sort_key.clone()]),
+ sort_key,
+ );
// no parallelization, because SortRequiredExec doesn't benefit from
increased parallelism
let expected_parquet = &[
@@ -4150,7 +4157,7 @@ pub(crate) mod tests {
}
#[test]
- fn preserve_ordering_through_repartition() -> Result<()> {
+ fn remove_unnecessary_spm_after_filter() -> Result<()> {
let schema = schema();
let sort_key = vec![PhysicalSortExpr {
expr: col("c", &schema).unwrap(),
@@ -4159,8 +4166,10 @@ pub(crate) mod tests {
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
let physical_plan = sort_preserving_merge_exec(sort_key,
filter_exec(input));
+ // Original plan expects its output to be ordered by c@2 ASC.
+ // This is still satisfied since, after filter that column is constant.
let expected = &[
- "SortPreservingMergeExec: [c@2 ASC]",
+ "CoalescePartitionsExec",
"FilterExec: c@2 = 0",
"RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[c@2 ASC]",
@@ -4172,6 +4181,29 @@ pub(crate) mod tests {
Ok(())
}
+ #[test]
+ fn preserve_ordering_through_repartition() -> Result<()> {
+ let schema = schema();
+ let sort_key = vec![PhysicalSortExpr {
+ expr: col("d", &schema).unwrap(),
+ options: SortOptions::default(),
+ }];
+ let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
+ let physical_plan = sort_preserving_merge_exec(sort_key,
filter_exec(input));
+
+ let expected = &[
+ "SortPreservingMergeExec: [d@3 ASC]",
+ "FilterExec: c@2 = 0",
+ "RepartitionExec: partitioning=RoundRobinBatch(10),
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
+ "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a,
b, c, d, e], output_ordering=[d@3 ASC]",
+ ];
+ // last flag sets config.optimizer.PREFER_EXISTING_SORT
+ assert_optimized!(expected, physical_plan.clone(), true, true);
+ assert_optimized!(expected, physical_plan, false, true);
+
+ Ok(())
+ }
+
#[test]
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
let schema = schema();
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 5eb9d6eb1b..7ce540b267 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -134,6 +134,16 @@ impl EquivalenceProperties {
&self.constants
}
+ /// Returns the output ordering of the properties.
+ pub fn output_ordering(&self) -> Option<LexOrdering> {
+ let constants = self.constants();
+ let mut output_ordering =
self.oeq_class().output_ordering().unwrap_or_default();
+ // Prune out constant expressions
+ output_ordering
+ .retain(|sort_expr| !physical_exprs_contains(constants,
&sort_expr.expr));
+ (!output_ordering.is_empty()).then_some(output_ordering)
+ }
+
/// Returns the normalized version of the ordering equivalence class
within.
/// Normalization removes constants and duplicates as well as standardizing
/// expressions according to the equivalence group within.
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index 3e8e439c9a..3decf2e340 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -574,7 +574,7 @@ impl PlanProperties {
execution_mode: ExecutionMode,
) -> Self {
// Output ordering can be derived from `eq_properties`.
- let output_ordering = eq_properties.oeq_class().output_ordering();
+ let output_ordering = eq_properties.output_ordering();
Self {
eq_properties,
partitioning,
@@ -599,7 +599,7 @@ impl PlanProperties {
pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties)
-> Self {
// Changing equivalence properties also changes output ordering, so
// make sure to overwrite it:
- self.output_ordering = eq_properties.oeq_class().output_ordering();
+ self.output_ordering = eq_properties.output_ordering();
self.eq_properties = eq_properties;
self
}