This is an automated email from the ASF dual-hosted git repository.
akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f305a0894e Add constant expression support to equivalence properties
(#9198)
f305a0894e is described below
commit f305a0894ed510eaf574957ab6771aa277e025c8
Author: Mustafa Akur <[email protected]>
AuthorDate: Tue Feb 13 09:05:14 2024 +0300
Add constant expression support to equivalence properties (#9198)
* Consider constants in the equivalence
* Resolve linter errors
* Minor changes
* Add new test
---
.../physical-expr/src/equivalence/properties.rs | 48 ++++++++++++++++++---
datafusion/sqllogictest/test_files/order.slt | 8 ++--
datafusion/sqllogictest/test_files/subquery.slt | 4 +-
datafusion/sqllogictest/test_files/union.slt | 8 ++--
datafusion/sqllogictest/test_files/window.slt | 49 ++++++++++++++++++++++
5 files changed, 101 insertions(+), 16 deletions(-)
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 2471d9249e..386e74b4a6 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -31,7 +31,7 @@ use crate::{
PhysicalSortRequirement,
};
-use arrow_schema::SchemaRef;
+use arrow_schema::{SchemaRef, SortOptions};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{JoinSide, JoinType};
@@ -710,10 +710,16 @@ impl EquivalenceProperties {
.flat_map(|&idx| {
let ExprOrdering { expr, data, .. } =
eq_properties.get_expr_ordering(exprs[idx].clone());
- if let SortProperties::Ordered(options) = data {
- Some((PhysicalSortExpr { expr, options }, idx))
- } else {
- None
+ match data {
+ SortProperties::Ordered(options) => {
+ Some((PhysicalSortExpr { expr, options }, idx))
+ }
+ SortProperties::Singleton => {
+ // Assign default ordering to constant expressions
+ let options = SortOptions::default();
+ Some((PhysicalSortExpr { expr, options }, idx))
+ }
+ SortProperties::Unordered => None,
}
})
.collect::<Vec<_>>();
@@ -839,7 +845,7 @@ fn is_constant_recurse(
constants: &[Arc<dyn PhysicalExpr>],
expr: &Arc<dyn PhysicalExpr>,
) -> bool {
- if physical_exprs_contains(constants, expr) {
+ if physical_exprs_contains(constants, expr) ||
expr.as_any().is::<Literal>() {
return true;
}
let children = expr.children();
@@ -1881,6 +1887,36 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_find_longest_permutation2() -> Result<()> {
+ // Schema satisfies following orderings:
+ // [a ASC], [d ASC, b ASC], [e DESC, f ASC, g ASC]
+ // and
+ // Column [a=c] (e.g they are aliases).
+ // At below we add [d ASC, h DESC] also, for test purposes
+ let (test_schema, mut eq_properties) = create_test_params()?;
+ let col_h = &col("h", &test_schema)?;
+
+ // Add column h as constant
+ eq_properties = eq_properties.add_constants(vec![col_h.clone()]);
+
+ let test_cases = vec![
+ // TEST CASE 1
+ // ordering of the constants are treated as default ordering.
+ // This is the convention currently used.
+ (vec![col_h], vec![(col_h, SortOptions::default())]),
+ ];
+ for (exprs, expected) in test_cases {
+ let exprs = exprs.into_iter().cloned().collect::<Vec<_>>();
+ let expected = convert_to_sort_exprs(&expected);
+ let (actual, _) = eq_properties.find_longest_permutation(&exprs);
+ assert_eq!(actual, expected);
+ }
+
+ Ok(())
+ }
+
#[test]
fn test_get_meet_ordering() -> Result<()> {
let schema = create_test_schema()?;
diff --git a/datafusion/sqllogictest/test_files/order.slt
b/datafusion/sqllogictest/test_files/order.slt
index 61888eb800..2ea78448b9 100644
--- a/datafusion/sqllogictest/test_files/order.slt
+++ b/datafusion/sqllogictest/test_files/order.slt
@@ -769,18 +769,18 @@ SortPreservingMergeExec: [m@0 ASC NULLS LAST,t@1 ASC
NULLS LAST]
--SortExec: expr=[m@0 ASC NULLS LAST,t@1 ASC NULLS LAST]
----InterleaveExec
------ProjectionExec: expr=[Int64(0)@0 as m, t@1 as t]
---------AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1
as t], aggr=[]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(0)@0 as Int64(0), t@1
as t], aggr=[], ordering_mode=PartiallySorted([0])
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([Int64(0)@0, t@1], 2),
input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-----------------AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t],
aggr=[]
+----------------AggregateExec: mode=Partial, gby=[0 as Int64(0), t@0 as t],
aggr=[], ordering_mode=PartiallySorted([0])
------------------ProjectionExec: expr=[column1@0 as t]
--------------------ValuesExec
------ProjectionExec: expr=[Int64(1)@0 as m, t@1 as t]
---------AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1
as t], aggr=[]
+--------AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1), t@1
as t], aggr=[], ordering_mode=PartiallySorted([0])
----------CoalesceBatchesExec: target_batch_size=8192
------------RepartitionExec: partitioning=Hash([Int64(1)@0, t@1], 2),
input_partitions=2
--------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-----------------AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t],
aggr=[]
+----------------AggregateExec: mode=Partial, gby=[1 as Int64(1), t@0 as t],
aggr=[], ordering_mode=PartiallySorted([0])
------------------ProjectionExec: expr=[column1@0 as t]
--------------------ValuesExec
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 1ca9045f1b..1f2870a8c9 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -264,10 +264,10 @@ ProjectionExec: expr=[t1_id@0 as t1_id, SUM(t2.t2_int)@1
as t2_sum]
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([t2_id@1], 4), input_partitions=4
----------ProjectionExec: expr=[SUM(t2.t2_int)@2 as SUM(t2.t2_int), t2_id@0 as
t2_id]
-------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id,
Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)]
+------------AggregateExec: mode=FinalPartitioned, gby=[t2_id@0 as t2_id,
Utf8("a")@1 as Utf8("a")], aggr=[SUM(t2.t2_int)],
ordering_mode=PartiallySorted([1])
--------------CoalesceBatchesExec: target_batch_size=2
----------------RepartitionExec: partitioning=Hash([t2_id@0, Utf8("a")@1], 4),
input_partitions=4
-------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as
Utf8("a")], aggr=[SUM(t2.t2_int)]
+------------------AggregateExec: mode=Partial, gby=[t2_id@0 as t2_id, a as
Utf8("a")], aggr=[SUM(t2.t2_int)], ordering_mode=PartiallySorted([1])
--------------------MemoryExec: partitions=4, partition_sizes=[1, 0, 0, 0]
query II rowsort
diff --git a/datafusion/sqllogictest/test_files/union.slt
b/datafusion/sqllogictest/test_files/union.slt
index b4e338875e..5550fa68a6 100644
--- a/datafusion/sqllogictest/test_files/union.slt
+++ b/datafusion/sqllogictest/test_files/union.slt
@@ -547,10 +547,10 @@ Union
physical_plan
UnionExec
--ProjectionExec: expr=[Int64(1)@0 as a]
-----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)], aggr=[]
+----AggregateExec: mode=FinalPartitioned, gby=[Int64(1)@0 as Int64(1)],
aggr=[], ordering_mode=Sorted
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([Int64(1)@0], 4), input_partitions=1
-----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[]
+----------AggregateExec: mode=Partial, gby=[1 as Int64(1)], aggr=[],
ordering_mode=Sorted
------------PlaceholderRowExec
--ProjectionExec: expr=[2 as a]
----PlaceholderRowExec
@@ -578,10 +578,10 @@ Union
physical_plan
UnionExec
--ProjectionExec: expr=[COUNT(*)@1 as count, n@0 as n]
-----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)]
+----AggregateExec: mode=FinalPartitioned, gby=[n@0 as n], aggr=[COUNT(*)],
ordering_mode=Sorted
------CoalesceBatchesExec: target_batch_size=2
--------RepartitionExec: partitioning=Hash([n@0], 4), input_partitions=1
-----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)]
+----------AggregateExec: mode=Partial, gby=[n@0 as n], aggr=[COUNT(*)],
ordering_mode=Sorted
------------ProjectionExec: expr=[5 as n]
--------------PlaceholderRowExec
--ProjectionExec: expr=[1 as count, MAX(Int64(10))@0 as n]
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index afdd4a9b48..fa4445d4cd 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -4053,3 +4053,52 @@ SELECT c3,
125 217013822852 8701233618
123 213388536442 11293564174
97 210796205886 14767488750
+
+statement ok
+create table a (a bigint) AS VALUES (1);
+
+query I
+select count(*) over (partition by a order by a) from (select * from a where a
= 1);
+----
+1
+
+query TT
+EXPLAIN select count(*) over (partition by a order by a) from (select * from a
where a = 1);
+----
+logical_plan
+Projection: COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
+--WindowAggr: windowExpr=[[COUNT(UInt8(1)) PARTITION BY [a.a] ORDER BY [a.a
ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS COUNT(*)
PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED
PRECEDING AND CURRENT ROW]]
+----Filter: a.a = Int64(1)
+------TableScan: a projection=[a]
+physical_plan
+ProjectionExec: expr=[COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS
LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@1 as COUNT(*) PARTITION
BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND
CURRENT ROW]
+--BoundedWindowAggExec: wdw=[COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC
NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name:
"COUNT(*) PARTITION BY [a.a] ORDER BY [a.a ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true,
dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame {
units: Range, start_bound: Preceding(Int64(NULL)), end_bound: CurrentRow }],
mode=[Sorted]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CoalesceBatchesExec: target_batch_size=4096
+------------FilterExec: a@0 = 1
+--------------MemoryExec: partitions=1, partition_sizes=[1]
+
+query I
+select ROW_NUMBER() over (partition by a) from (select * from a where a = 1);
+----
+1
+
+query TT
+EXPLAIN select ROW_NUMBER() over (partition by a) from (select * from a where
a = 1);
+----
+logical_plan
+Projection: ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN UNBOUNDED PRECEDING
AND UNBOUNDED FOLLOWING
+--WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+----Filter: a.a = Int64(1)
+------TableScan: a projection=[a]
+physical_plan
+ProjectionExec: expr=[ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING@1 as ROW_NUMBER() PARTITION BY [a.a] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]
+--BoundedWindowAggExec: wdw=[ROW_NUMBER() PARTITION BY [a.a] ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER()
PARTITION BY [a.a] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING",
data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false,
metadata: {} }), frame: WindowFrame { units: Rows, start_bound:
Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }], mode=[Sorted]
+----CoalesceBatchesExec: target_batch_size=4096
+------RepartitionExec: partitioning=Hash([a@0], 2), input_partitions=2
+--------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
+----------CoalesceBatchesExec: target_batch_size=4096
+------------FilterExec: a@0 = 1
+--------------MemoryExec: partitions=1, partition_sizes=[1]