This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ffc8444501 feat: eliminate redundant sorts on monotonic expressions 
(#9813)
ffc8444501 is described below

commit ffc84445015eb69b2d3352304ec09fbc6267f282
Author: Matthew Cramerus <[email protected]>
AuthorDate: Fri Apr 12 16:39:06 2024 -0500

    feat: eliminate redundant sorts on monotonic expressions (#9813)
    
    * initial impl
    
    * add comments & fix name
    
    * even more comments
    
    * add negative test
    
    * updated sqllogictest
    
    * make the test easier to read
    
    * add collapse_monotonic_lex_req into collapse_lex_req
    
    * more tests in sqllogictest
    
    * another test in sqllogictest
    
    * add yet another negative test case
---
 datafusion/physical-expr/src/equivalence/mod.rs    |  48 +++++++++-
 .../physical-expr/src/equivalence/properties.rs    |  82 +++++++++++++++++
 .../test_files/filter_without_sort_exec.slt        | 102 ++++++++++++++++++++-
 3 files changed, 226 insertions(+), 6 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence/mod.rs 
b/datafusion/physical-expr/src/equivalence/mod.rs
index 46909f2361..fd8123c45b 100644
--- a/datafusion/physical-expr/src/equivalence/mod.rs
+++ b/datafusion/physical-expr/src/equivalence/mod.rs
@@ -18,6 +18,7 @@
 use std::sync::Arc;
 
 use crate::expressions::Column;
+use crate::sort_properties::SortProperties;
 use crate::{LexRequirement, PhysicalExpr, PhysicalSortRequirement};
 
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
@@ -35,6 +36,10 @@ pub use properties::{join_equivalence_properties, 
EquivalenceProperties};
 /// This function constructs a duplicate-free `LexOrderingReq` by filtering out
 /// duplicate entries that have same physical expression inside. For example,
 /// `vec![a Some(ASC), a Some(DESC)]` collapses to `vec![a Some(ASC)]`.
+///
+/// It will also filter out entries that are ordered if the next entry is;
+/// for instance, `vec![floor(a) Some(ASC), a Some(ASC)]` will be collapsed to
+/// `vec![a Some(ASC)]`.
 pub fn collapse_lex_req(input: LexRequirement) -> LexRequirement {
     let mut output = Vec::<PhysicalSortRequirement>::new();
     for item in input {
@@ -42,7 +47,48 @@ pub fn collapse_lex_req(input: LexRequirement) -> 
LexRequirement {
             output.push(item);
         }
     }
-    output
+    collapse_monotonic_lex_req(output)
+}
+
+/// This function constructs a normalized [`LexRequirement`] by filtering out 
entries
+/// that are ordered if the next entry is.
+/// Used in `collapse_lex_req`
+fn collapse_monotonic_lex_req(input: LexRequirement) -> LexRequirement {
+    input
+        .iter()
+        .enumerate()
+        .filter_map(|(i, item)| {
+            // If it's the last entry, there is no next entry
+            if i == input.len() - 1 {
+                return Some(item);
+            }
+            let next_expr = &input[i + 1];
+
+            // Only handle expressions with exactly one child
+            // TODO: it should be possible to handle expressions orderings 
f(a, b, c), a, b, c
+            // if f is monotonic in all arguments
+            if !(item.expr.children().len() == 1
+                && item.expr.children()[0].eq(&next_expr.expr))
+            {
+                return Some(item);
+            }
+
+            let opts = match next_expr.options {
+                None => return Some(item),
+                Some(opts) => opts,
+            };
+
+            if item.options.map(SortProperties::Ordered)
+                == 
Some(item.expr.get_ordering(&[SortProperties::Ordered(opts)]))
+            {
+                // Remove the redundant sort
+                return None;
+            }
+
+            Some(item)
+        })
+        .cloned()
+        .collect::<Vec<_>>()
 }
 
 /// Adds the `offset` value to `Column` indices inside `expr`. This function is
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index c14c88d6c6..58ef5ec797 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -2212,6 +2212,88 @@ mod tests {
             );
         }
 
+        Ok(())
+    }
+    #[test]
+    fn test_eliminate_redundant_monotonic_sorts() -> Result<()> {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Date32, true),
+            Field::new("b", DataType::Utf8, true),
+            Field::new("c", DataType::Timestamp(TimeUnit::Nanosecond, None), 
true),
+        ]));
+        let base_properties = 
EquivalenceProperties::new(schema.clone()).with_reorder(
+            ["a", "b", "c"]
+                .into_iter()
+                .map(|c| {
+                    col(c, schema.as_ref()).map(|expr| PhysicalSortExpr {
+                        expr,
+                        options: SortOptions {
+                            descending: false,
+                            nulls_first: true,
+                        },
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?,
+        );
+
+        struct TestCase {
+            name: &'static str,
+            constants: Vec<Arc<dyn PhysicalExpr>>,
+            equal_conditions: Vec<[Arc<dyn PhysicalExpr>; 2]>,
+            sort_columns: &'static [&'static str],
+            should_satisfy_ordering: bool,
+        }
+
+        let col_a = col("a", schema.as_ref())?;
+        let col_b = col("b", schema.as_ref())?;
+        let col_c = col("c", schema.as_ref())?;
+        let cast_c = Arc::new(CastExpr::new(col_c, DataType::Date32, None));
+
+        let cases = vec![
+            TestCase {
+                name: "(a, b, c) -> (c)",
+                // b is constant, so it should be removed from the sort order
+                constants: vec![col_b],
+                equal_conditions: vec![[cast_c.clone(), col_a.clone()]],
+                sort_columns: &["c"],
+                should_satisfy_ordering: true,
+            },
+            TestCase {
+                name: "not ordered because (b) is not constant",
+                // b is not constant anymore
+                constants: vec![],
+                // a and c are still compatible, but this is irrelevant since 
the original ordering is (a, b, c)
+                equal_conditions: vec![[cast_c.clone(), col_a.clone()]],
+                sort_columns: &["c"],
+                should_satisfy_ordering: false,
+            },
+        ];
+
+        for case in cases {
+            let mut properties = 
base_properties.clone().add_constants(case.constants);
+            for [left, right] in &case.equal_conditions {
+                properties.add_equal_conditions(left, right)
+            }
+
+            let sort = case
+                .sort_columns
+                .iter()
+                .map(|&name| {
+                    col(name, &schema).map(|col| PhysicalSortExpr {
+                        expr: col,
+                        options: SortOptions::default(),
+                    })
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            assert_eq!(
+                properties.ordering_satisfy(&sort),
+                case.should_satisfy_ordering,
+                "failed test '{}'",
+                case.name
+            );
+        }
+
         Ok(())
     }
 }
diff --git a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt 
b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
index 05e622db8a..b2cc64e3a7 100644
--- a/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
+++ b/datafusion/sqllogictest/test_files/filter_without_sort_exec.slt
@@ -18,9 +18,9 @@
 # prepare table
 statement ok
 CREATE UNBOUNDED EXTERNAL TABLE data (
-    "date"   VARCHAR, 
+    "date"   DATE, 
     "ticker" VARCHAR, 
-    "time"   VARCHAR,
+    "time"   TIMESTAMP,
 ) STORED AS CSV
 WITH ORDER ("date", "ticker", "time")
 LOCATION './a.parquet';
@@ -43,19 +43,111 @@ SortPreservingMergeExec: [date@0 ASC NULLS LAST,time@2 ASC 
NULLS LAST]
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 --------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]
 
+# constant ticker, CAST(time AS DATE) = time, order by time
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time"
+----
+logical_plan
+Sort: data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [time@2 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# same thing but order by date
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "date"
+----
+logical_plan
+Sort: data.date ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [date@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# same thing but order by ticker
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "ticker"
+----
+logical_plan
+Sort: data.ticker ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+CoalescePartitionsExec
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# same thing but order by time, date
+query TT
+explain SELECT * FROM data 
+WHERE ticker = 'A' AND CAST(time AS DATE) = date
+ORDER BY "time", "date";
+----
+logical_plan
+Sort: data.time ASC NULLS LAST, data.date ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) = data.date
+----TableScan: data projection=[date, ticker, time]
+physical_plan
+SortPreservingMergeExec: [time@2 ASC NULLS LAST,date@0 ASC NULLS LAST]
+--CoalesceBatchesExec: target_batch_size=8192
+----FilterExec: ticker@1 = A AND CAST(time@2 AS Date32) = date@0
+------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+--------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]
+
+# CAST(time AS DATE) <> date (should require a sort)
+# no physical plan due to sort breaking pipeline
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A' AND CAST(time AS DATE) <> date
+ORDER BY "time"
+----
+logical_plan
+Sort: data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A") AND CAST(data.time AS Date32) != data.date
+----TableScan: data projection=[date, ticker, time]
+
+# no relation between time & date
+# should also be pipeline breaking
+query TT
+explain SELECT * FROM data
+WHERE ticker = 'A'
+ORDER BY "time"
+----
+logical_plan
+Sort: data.time ASC NULLS LAST
+--Filter: data.ticker = Utf8("A")
+----TableScan: data projection=[date, ticker, time]
+
 # query
 query TT
 explain SELECT * FROM data 
-WHERE date = 'A' 
+WHERE date = '2006-01-02' 
 ORDER BY "ticker", "time";
 ----
 logical_plan
 Sort: data.ticker ASC NULLS LAST, data.time ASC NULLS LAST
---Filter: data.date = Utf8("A")
+--Filter: data.date = Date32("13150")
 ----TableScan: data projection=[date, ticker, time]
 physical_plan
 SortPreservingMergeExec: [ticker@1 ASC NULLS LAST,time@2 ASC NULLS LAST]
 --CoalesceBatchesExec: target_batch_size=8192
-----FilterExec: date@0 = A
+----FilterExec: date@0 = 13150
 ------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 --------StreamingTableExec: partition_sizes=1, projection=[date, ticker, 
time], infinite_source=true, output_ordering=[date@0 ASC NULLS LAST, ticker@1 
ASC NULLS LAST, time@2 ASC NULLS LAST]

Reply via email to