This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 701abf72b1 Prune out constant expressions from output ordering. (#9947)
701abf72b1 is described below

commit 701abf72b118d2379c401de3bd12d3ce977d1bae
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Apr 5 09:18:37 2024 +0300

    Prune out constant expressions from output ordering. (#9947)
    
    * Initial commit
    
    * Add new unit test
    
    * Make requirement explicit
    
    * Minor changes
    
    * Minor changes
---
 .../src/physical_optimizer/enforce_distribution.rs | 96 ++++++++++++++--------
 .../physical-expr/src/equivalence/properties.rs    | 10 +++
 datafusion/physical-plan/src/lib.rs                |  4 +-
 3 files changed, 76 insertions(+), 34 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs 
b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index a58f8698d6..145f08af76 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1327,11 +1327,6 @@ pub(crate) mod tests {
     }
 
     impl SortRequiredExec {
-        fn new(input: Arc<dyn ExecutionPlan>) -> Self {
-            let expr = input.output_ordering().unwrap_or(&[]).to_vec();
-            Self::new_with_requirement(input, expr)
-        }
-
         fn new_with_requirement(
             input: Arc<dyn ExecutionPlan>,
             requirement: Vec<PhysicalSortExpr>,
@@ -1391,10 +1386,11 @@ pub(crate) mod tests {
 
         // model that it requires the output ordering of its input
         fn required_input_ordering(&self) -> 
Vec<Option<Vec<PhysicalSortRequirement>>> {
-            vec![self
-                .properties()
-                .output_ordering()
-                .map(PhysicalSortRequirement::from_sort_exprs)]
+            if self.expr.is_empty() {
+                vec![None]
+            } else {
+                
vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))]
+            }
         }
 
         fn with_new_children(
@@ -1677,10 +1673,6 @@ pub(crate) mod tests {
         Arc::new(UnionExec::new(input))
     }
 
-    fn sort_required_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
-        Arc::new(SortRequiredExec::new(input))
-    }
-
     fn sort_required_exec_with_req(
         input: Arc<dyn ExecutionPlan>,
         sort_exprs: LexOrdering,
@@ -3206,8 +3198,10 @@ pub(crate) mod tests {
             expr: col("c", &schema).unwrap(),
             options: SortOptions::default(),
         }];
-        let plan =
-            sort_required_exec(filter_exec(sort_exec(sort_key, parquet_exec(), 
false)));
+        let plan = sort_required_exec_with_req(
+            filter_exec(sort_exec(sort_key.clone(), parquet_exec(), false)),
+            sort_key,
+        );
 
         let expected = &[
             "SortRequiredExec: [c@2 ASC]",
@@ -3367,18 +3361,20 @@ pub(crate) mod tests {
         //    Parquet(sorted)
         let schema = schema();
         let sort_key = vec![PhysicalSortExpr {
-            expr: col("c", &schema).unwrap(),
+            expr: col("d", &schema).unwrap(),
             options: SortOptions::default(),
         }];
-        let plan =
-            
sort_required_exec(filter_exec(parquet_exec_with_sort(vec![sort_key])));
+        let plan = sort_required_exec_with_req(
+            filter_exec(parquet_exec_with_sort(vec![sort_key.clone()])),
+            sort_key,
+        );
 
         // during repartitioning ordering is preserved
         let expected = &[
-            "SortRequiredExec: [c@2 ASC]",
+            "SortRequiredExec: [d@3 ASC]",
             "FilterExec: c@2 = 0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=1",
-            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[c@2 ASC]",
+            "ParquetExec: file_groups={1 group: [[x]]}, projection=[a, b, c, 
d, e], output_ordering=[d@3 ASC]",
         ];
 
         assert_optimized!(expected, plan.clone(), true, true);
@@ -3403,7 +3399,10 @@ pub(crate) mod tests {
             expr: col("c", &schema).unwrap(),
             options: SortOptions::default(),
         }];
-        let input1 = 
sort_required_exec(parquet_exec_with_sort(vec![sort_key]));
+        let input1 = sort_required_exec_with_req(
+            parquet_exec_with_sort(vec![sort_key.clone()]),
+            sort_key,
+        );
         let input2 = filter_exec(parquet_exec());
         let plan = union_exec(vec![input1, input2]);
 
@@ -3481,10 +3480,13 @@ pub(crate) mod tests {
             ("c".to_string(), "c".to_string()),
         ];
         // sorted input
-        let plan = sort_required_exec(projection_exec_with_alias(
-            parquet_exec_multiple_sorted(vec![sort_key]),
-            alias,
-        ));
+        let plan = sort_required_exec_with_req(
+            projection_exec_with_alias(
+                parquet_exec_multiple_sorted(vec![sort_key.clone()]),
+                alias,
+            ),
+            sort_key,
+        );
 
         let expected = &[
             "SortRequiredExec: [c@2 ASC]",
@@ -3639,8 +3641,8 @@ pub(crate) mod tests {
             options: SortOptions::default(),
         }];
 
-        let plan = filter_exec(parquet_exec_multiple_sorted(vec![sort_key]));
-        let plan = sort_required_exec(plan);
+        let plan = 
filter_exec(parquet_exec_multiple_sorted(vec![sort_key.clone()]));
+        let plan = sort_required_exec_with_req(plan, sort_key);
 
         // The groups must have only contiguous ranges of rows from the same 
file
         // if any group has rows from multiple files, the data is no longer 
sorted destroyed
@@ -4025,9 +4027,14 @@ pub(crate) mod tests {
         }];
         //  SortRequired
         //    Parquet(sorted)
-        let plan_parquet =
-            sort_required_exec(parquet_exec_with_sort(vec![sort_key.clone()]));
-        let plan_csv = sort_required_exec(csv_exec_with_sort(vec![sort_key]));
+        let plan_parquet = sort_required_exec_with_req(
+            parquet_exec_with_sort(vec![sort_key.clone()]),
+            sort_key.clone(),
+        );
+        let plan_csv = sort_required_exec_with_req(
+            csv_exec_with_sort(vec![sort_key.clone()]),
+            sort_key,
+        );
 
         // no parallelization, because SortRequiredExec doesn't benefit from 
increased parallelism
         let expected_parquet = &[
@@ -4150,7 +4157,7 @@ pub(crate) mod tests {
     }
 
     #[test]
-    fn preserve_ordering_through_repartition() -> Result<()> {
+    fn remove_unnecessary_spm_after_filter() -> Result<()> {
         let schema = schema();
         let sort_key = vec![PhysicalSortExpr {
             expr: col("c", &schema).unwrap(),
@@ -4159,8 +4166,10 @@ pub(crate) mod tests {
         let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
         let physical_plan = sort_preserving_merge_exec(sort_key, 
filter_exec(input));
 
+        // Original plan expects its output to be ordered by c@2 ASC.
+        // This is still satisfied since, after filter that column is constant.
         let expected = &[
-            "SortPreservingMergeExec: [c@2 ASC]",
+            "CoalescePartitionsExec",
             "FilterExec: c@2 = 0",
             "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=c@2 ASC",
             "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[c@2 ASC]",
@@ -4172,6 +4181,29 @@ pub(crate) mod tests {
         Ok(())
     }
 
+    #[test]
+    fn preserve_ordering_through_repartition() -> Result<()> {
+        let schema = schema();
+        let sort_key = vec![PhysicalSortExpr {
+            expr: col("d", &schema).unwrap(),
+            options: SortOptions::default(),
+        }];
+        let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
+        let physical_plan = sort_preserving_merge_exec(sort_key, 
filter_exec(input));
+
+        let expected = &[
+            "SortPreservingMergeExec: [d@3 ASC]",
+            "FilterExec: c@2 = 0",
+            "RepartitionExec: partitioning=RoundRobinBatch(10), 
input_partitions=2, preserve_order=true, sort_exprs=d@3 ASC",
+            "ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, 
b, c, d, e], output_ordering=[d@3 ASC]",
+        ];
+        // last flag sets config.optimizer.PREFER_EXISTING_SORT
+        assert_optimized!(expected, physical_plan.clone(), true, true);
+        assert_optimized!(expected, physical_plan, false, true);
+
+        Ok(())
+    }
+
     #[test]
     fn do_not_preserve_ordering_through_repartition() -> Result<()> {
         let schema = schema();
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 5eb9d6eb1b..7ce540b267 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -134,6 +134,16 @@ impl EquivalenceProperties {
         &self.constants
     }
 
+    /// Returns the output ordering of the properties.
+    pub fn output_ordering(&self) -> Option<LexOrdering> {
+        let constants = self.constants();
+        let mut output_ordering = 
self.oeq_class().output_ordering().unwrap_or_default();
+        // Prune out constant expressions
+        output_ordering
+            .retain(|sort_expr| !physical_exprs_contains(constants, 
&sort_expr.expr));
+        (!output_ordering.is_empty()).then_some(output_ordering)
+    }
+
     /// Returns the normalized version of the ordering equivalence class 
within.
     /// Normalization removes constants and duplicates as well as standardizing
     /// expressions according to the equivalence group within.
diff --git a/datafusion/physical-plan/src/lib.rs 
b/datafusion/physical-plan/src/lib.rs
index 3e8e439c9a..3decf2e340 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -574,7 +574,7 @@ impl PlanProperties {
         execution_mode: ExecutionMode,
     ) -> Self {
         // Output ordering can be derived from `eq_properties`.
-        let output_ordering = eq_properties.oeq_class().output_ordering();
+        let output_ordering = eq_properties.output_ordering();
         Self {
             eq_properties,
             partitioning,
@@ -599,7 +599,7 @@ impl PlanProperties {
     pub fn with_eq_properties(mut self, eq_properties: EquivalenceProperties) 
-> Self {
         // Changing equivalence properties also changes output ordering, so
         // make sure to overwrite it:
-        self.output_ordering = eq_properties.oeq_class().output_ordering();
+        self.output_ordering = eq_properties.output_ordering();
         self.eq_properties = eq_properties;
         self
     }

Reply via email to