findepi commented on code in PR #17684:
URL: https://github.com/apache/datafusion/pull/17684#discussion_r2366146307
##########
datafusion/physical-plan/src/windows/mod.rs:
##########
@@ -371,18 +371,46 @@ pub(crate) fn window_equivalence_properties(
for (i, expr) in window_exprs.iter().enumerate() {
let partitioning_exprs = expr.partition_by();
let no_partitioning = partitioning_exprs.is_empty();
- // Collect columns defining partitioning, and construct all
`SortOptions`
- // variations for them. Then, we will check each one whether it
satisfies
- // the existing ordering provided by the input plan.
+
+ // Use incremental approach to avoid O(4^n) exponential complexity:
+ // Instead of generating all combinations upfront via
multi_cartesian_product,
+ // we build orderings incrementally and prune invalid paths early.
let mut all_satisfied_lexs = vec![];
- for lex in partitioning_exprs
- .iter()
- .map(|pb_order|
sort_options_resolving_constant(Arc::clone(pb_order)))
- .multi_cartesian_product()
- .filter_map(LexOrdering::new)
- {
- if window_eq_properties.ordering_satisfy(lex.clone())? {
- all_satisfied_lexs.push(lex);
+ if !no_partitioning {
+ // Start with empty orderings that we'll extend incrementally
+ let mut current_orderings = vec![vec![]];
+ for partition_expr in partitioning_exprs.iter() {
+ let mut next_orderings = vec![];
+
+ let sort_options =
+
sort_options_resolving_constant(Arc::clone(partition_expr), true);
+
+ // For each current partial ordering, try extending with each
sort option
+ for current in current_orderings.iter() {
+ for sort_expr in sort_options.iter() {
+ let mut extended = current.clone();
+ extended.push(sort_expr.clone());
Review Comment:
In https://github.com/apache/datafusion/pull/17563 we have this example query
```
# regression test for https://github.com/apache/datafusion/issues/17401
query I
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS
a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, ''
AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, ''
AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, ''
AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, ''
AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15,
a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33,
a34, a35, a36, a37, a38, a39, a40
)
FROM source;
```
with current PR code it never completes, but let's ignore this for a moment.
if i read the code correctly, it "prunes early" when next partitioning
expression doesn't provide more benefits. So i thought that it won't prune in
some cases. Perhaps when input is pre-partitioned?
Let's add a test query
```
WITH source AS (
SELECT
1 AS n,
'' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS
a7, '' AS a8,
'' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, ''
AS a15, '' AS a16,
'' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, ''
AS a23, '' AS a24,
'' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, ''
AS a31, '' AS a32,
'' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, ''
AS a39, '' AS a40
)
SELECT
sum(n) OVER (PARTITION BY
a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15,
a16, a17, a18, a19, a20,
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33,
a34, a35, a36, a37, a38, a39, a40
)
FROM (SELECT * FROM source ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10,
a11, a12, a13, a14, a15, a16, a17, a18, a19, a20,;
a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33,
a34, a35, a36, a37, a38, a39, a40);
```
##########
datafusion/physical-plan/src/windows/mod.rs:
##########
@@ -371,18 +371,46 @@ pub(crate) fn window_equivalence_properties(
for (i, expr) in window_exprs.iter().enumerate() {
let partitioning_exprs = expr.partition_by();
let no_partitioning = partitioning_exprs.is_empty();
- // Collect columns defining partitioning, and construct all
`SortOptions`
- // variations for them. Then, we will check each one whether it
satisfies
- // the existing ordering provided by the input plan.
+
+ // Use incremental approach to avoid O(4^n) exponential complexity:
+ // Instead of generating all combinations upfront via
multi_cartesian_product,
+ // we build orderings incrementally and prune invalid paths early.
let mut all_satisfied_lexs = vec![];
- for lex in partitioning_exprs
- .iter()
- .map(|pb_order|
sort_options_resolving_constant(Arc::clone(pb_order)))
- .multi_cartesian_product()
- .filter_map(LexOrdering::new)
- {
- if window_eq_properties.ordering_satisfy(lex.clone())? {
- all_satisfied_lexs.push(lex);
+ if !no_partitioning {
+ // Start with empty orderings that we'll extend incrementally
+ let mut current_orderings = vec![vec![]];
+ for partition_expr in partitioning_exprs.iter() {
+ let mut next_orderings = vec![];
+
+ let sort_options =
+
sort_options_resolving_constant(Arc::clone(partition_expr), true);
+
+ // For each current partial ordering, try extending with each
sort option
+ for current in current_orderings.iter() {
+ for sort_expr in sort_options.iter() {
+ let mut extended = current.clone();
+ extended.push(sort_expr.clone());
Review Comment:
However, I am not sure if i understand correctly the pruning condition and
what's the condition for _not pruning_. Please elaborate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]