This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f305a0894e Add constant expression support to equivalence properties 
(#9198)
f305a0894e is described below

commit f305a0894ed510eaf574957ab6771aa277e025c8
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Feb 13 09:05:14 2024 +0300

    Add constant expression support to equivalence properties (#9198)
    
    * Consider constants in the equivalence
    
    * Resolve linter errors
    
    * Minor changes
    
    * Add new test
---
 .../physical-expr/src/equivalence/properties.rs    | 48 ++++++++++++++++++---
 datafusion/sqllogictest/test_files/order.slt       |  8 ++--
 datafusion/sqllogictest/test_files/subquery.slt    |  4 +-
 datafusion/sqllogictest/test_files/union.slt       |  8 ++--
 datafusion/sqllogictest/test_files/window.slt      | 49 ++++++++++++++++++++++
 5 files changed, 101 insertions(+), 16 deletions(-)

diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 2471d9249e..386e74b4a6 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -31,7 +31,7 @@ use crate::{
     PhysicalSortRequirement,
 };
 
-use arrow_schema::SchemaRef;
+use arrow_schema::{SchemaRef, SortOptions};
 use datafusion_common::tree_node::{Transformed, TreeNode};
 use datafusion_common::{JoinSide, JoinType};
 
@@ -710,10 +710,16 @@ impl EquivalenceProperties {
                 .flat_map(|&idx| {
                     let ExprOrdering { expr, data, .. } =
                         eq_properties.get_expr_ordering(exprs[idx].clone());
-                    if let SortProperties::Ordered(options) = data {
-                        Some((PhysicalSortExpr { expr, options }, idx))
-                    } else {
-                        None
+                    match data {
+                        SortProperties::Ordered(options) => {
+                            Some((PhysicalSortExpr { expr, options }, idx))
+                        }
+                        SortProperties::Singleton => {
+                            // Assign default ordering to constant expressions
+                            let options = SortOptions::default();
+                            Some((PhysicalSortExpr { expr, options }, idx))
+                        }
+                        SortProperties::Unordered => None,
                     }
                 })
                 .collect::<Vec<_>>();
@@ -839,7 +845,7 @@ fn is_constant_recurse(
     constants: &[Arc<dyn PhysicalExpr>],
     expr: &Arc<dyn PhysicalExpr>,
 ) -> bool {
-    if physical_exprs_contains(constants, expr) {
+    if physical_exprs_contains(constants, expr) || 
expr.as_any().is::<Literal>() {
         return true;
     }
     let children = expr.children();
@@ -1881,6 +1887,36 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_find_longest_permutation2() -> Result<()> {
+        // Schema satisfies following orderings:
+        // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
+        // and
+        // Column [a=c] (e.g they are aliases).
+        // At below we add [d ASC, h DESC] also, for test purposes
+        let (test_schema, mut eq_properties) = create_test_params()?;
+        let col_h = &col("h", &test_schema)?;
+
+        // Add column h as constant
+        eq_properties = eq_properties.add_constants(vec![col_h.clone()]);
+
+        let test_cases = vec![
+            // TEST CASE 1
+            // ordering of the constants are treated as default ordering.
+            // This is the convention currently used.
+            (vec![col_h], vec![(col_h, SortOptions::default())]),
+        ];
+        for (exprs, expected) in test_cases {
+            let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
+            let expected = convert_to_sort_exprs(&expected);
+            let (actual, _) = eq_properties.find_longest_permutation(&exprs);
+            assert_eq!(actual, expected);
+        }
+
+        Ok(())
+    }
+
     #[test]
     fn test_get_meet_ordering() -> Result<()> {
         let schema = create_test_schema()?;
diff --git a/datafusion/sqllogictest/test_files/order.slt 
b/datafusion/sqllogictest/test_files/order.slt
index 61888eb800..2ea78448b9 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -769,18 +769,18 @@ SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC 
NULLS LAST]
 --SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
 ----InterleaveExec
 ------ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
---------AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 
as t], aggr=[]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1 
as t], aggr=[], ordering_mode=PartiallySorted([0])
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2), 
input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], 
aggr=[]
+----------------AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t], 
aggr=[], ordering_mode=PartiallySorted([0])
 ------------------ProjectionExec: expr=[column1@0 as t]
 --------------------ValuesExec
 ------ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
---------AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 
as t], aggr=[]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1 
as t], aggr=[], ordering_mode=PartiallySorted([0])
 ----------CoalesceBatchesExec: target_batch_size=8192
 ------------RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2), 
input_partitions=2
 --------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-----------------AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], 
aggr=[]
+----------------AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t], 
aggr=[], ordering_mode=PartiallySorted([0])
 ------------------ProjectionExec: expr=[column1@0 as t]
 --------------------ValuesExec
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 1ca9045f1b..1f2870a8c9 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -264,10 +264,10 @@ ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1 
as t2_sum]
 ------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([t2_id@1], 4), input_partitions=4
 ----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as 
t2_id]
-------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id, 
Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)]
+------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id, 
Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)], 
ordering_mode=PartiallySorted([1])
 --------------CoalesceBatchesExec: target_batch_size=2
 ----------------RepartitionExec: partitioning=Hash([t2_id@0, Utf8("a")@1], 4), 
input_partitions=4
-------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as 
Utf8("a")], aggr=[SUM(t2.t2_int)]
+------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as 
Utf8("a")], aggr=[SUM(t2.t2_int)], ordering_mode=PartiallySorted([1])
 --------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
 
 query II rowsort
diff --git a/datafusion/sqllogictest/test_files/union.slt 
b/datafusion/sqllogictest/test_files/union.slt
index b4e338875e..5550fa68a6 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -547,10 +547,10 @@ Union
 physical_plan
 UnionExec
 --ProjectionExec: expr=[Int64(1)@0 as a]
-----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[]
+----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], 
aggr=[], ordering_mode=Sorted
 ------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1
-----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
+----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[], 
ordering_mode=Sorted
 ------------PlaceholderRowExec
 --ProjectionExec: expr=[2 as a]
 ----PlaceholderRowExec
@@ -578,10 +578,10 @@ Union
 physical_plan
 UnionExec
 --ProjectionExec: expr=[COUNT(*)@1 as count, n@0 as n]
-----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)]
+----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)], 
ordering_mode=Sorted
 ------CoalesceBatchesExec: target_batch_size=2
 --------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1
-----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
+----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)], 
ordering_mode=Sorted
 ------------ProjectionExec: expr=[5 as n]
 --------------PlaceholderRowExec
 --ProjectionExec: expr=[1 as count, MAX(Int64(10))@0 as n]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index afdd4a9b48..fa4445d4cd 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4053,3 +4053,52 @@ SELECT c3,
 125 217013822852 8701233618
 123 213388536442 11293564174
 97 210796205886 14767488750
+
+statement ok
+create table a (a bigint) AS VALUES (1);
+
+query I
+select count(*) over (partition by a order by a) from (select * from a where a 
= 1);
+----
+1
+
+query TT
+EXPLAIN select count(*) over (partition by a order by a) from (select * from a 
where a = 1);
+----
+logical_plan
+Projection: COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+--WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY [a.a] ORDER BY [a.a 
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS COUNT(*) 
PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED 
PRECEDING AND CURRENT ROW]]
+----Filter: a.a = Int64(1)
+------TableScan: a projection=[a]
+physical_plan
+ProjectionExec: expr=[COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS 
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as COUNT(*) PARTITION 
BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW]
+--BoundedWindowAggExec: wdw=[COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC 
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: 
"COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { 
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }], 
mode=[Sorted]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CoalesceBatchesExec: target_batch_size=4096
+------------FilterExec: a@0 = 1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query I
+select ROW_NUMBER() over (partition by a) from (select * from a where a = 1);
+----
+1
+
+query TT
+EXPLAIN select ROW_NUMBER() over (partition by a) from (select * from a where 
a = 1);
+----
+logical_plan
+Projection: ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN UNBOUNDED PRECEDING 
AND UNBOUNDED FOLLOWING
+--WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+----Filter: a.a = Int64(1)
+------TableScan: a projection=[a]
+physical_plan
+ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING@1 as ROW_NUMBER() PARTITION BY [a.a] ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]
+--BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() 
PARTITION BY [a.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", 
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, 
metadata: {} }), frame: WindowFrame { units: Rows, start_bound: 
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CoalesceBatchesExec: target_batch_size=4096
+------------FilterExec: a@0 = 1
+--------------MemoryExec: partitions=1, partition_sizes=[1]

Reply via email to