This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new c1d6f34316 Prevent exponential planning time for Window functions - v2 
(#17684)
c1d6f34316 is described below

commit c1d6f343161ad7a3a9a372469fafd529d0085ada
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Sep 25 21:34:24 2025 +0300

    Prevent exponential planning time for Window functions - v2 (#17684)
    
    * fix
    
    * Update mod.rs
    
    * Update mod.rs
    
    * Update mod.rs
    
    * tests copied from v1 pr
    
    * test case from review comment
    
    https://github.com/apache/datafusion/pull/17684#discussion_r2366146307
    
    * one more test case
    
    * Update mod.rs
    
    * Update datafusion/physical-plan/src/windows/mod.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update datafusion/physical-plan/src/windows/mod.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Update mod.rs
    
    * Update mod.rs
    
    ---------
    
    Co-authored-by: Piotr Findeisen <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/benches/sql_planner.rs        |   3 +-
 datafusion/physical-plan/src/windows/mod.rs   | 148 ++++++++++++++++++++------
 datafusion/sqllogictest/test_files/window.slt |  89 ++++++++++++++++
 3 files changed, 209 insertions(+), 31 deletions(-)

diff --git a/datafusion/core/benches/sql_planner.rs 
b/datafusion/core/benches/sql_planner.rs
index c71191507f..9ae6e1f570 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) {
         });
     });
 
-    for partitioning_columns in [4, 7, 8] {
+    // It was observed in production that queries with window functions 
sometimes partition over more than 30 columns
+    for partitioning_columns in [4, 7, 8, 12, 30] {
         c.bench_function(
             &format!(
                 
"physical_window_function_partition_by_{partitioning_columns}_on_values"
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 4b4e018fd5..829cadf50a 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -371,17 +371,40 @@ pub(crate) fn window_equivalence_properties(
     for (i, expr) in window_exprs.iter().enumerate() {
         let partitioning_exprs = expr.partition_by();
         let no_partitioning = partitioning_exprs.is_empty();
-        // Collect columns defining partitioning, and construct all 
`SortOptions`
-        // variations for them. Then, we will check each one whether it 
satisfies
-        // the existing ordering provided by the input plan.
+
+        // Find "one" valid ordering for partition columns to avoid 
exponential complexity.
+        // see https://github.com/apache/datafusion/issues/17401
         let mut all_satisfied_lexs = vec![];
-        for lex in partitioning_exprs
-            .iter()
-            .map(|pb_order| 
sort_options_resolving_constant(Arc::clone(pb_order)))
-            .multi_cartesian_product()
-            .filter_map(LexOrdering::new)
-        {
-            if window_eq_properties.ordering_satisfy(lex.clone())? {
+        let mut candidate_ordering = vec![];
+
+        for partition_expr in partitioning_exprs.iter() {
+            let sort_options =
+                sort_options_resolving_constant(Arc::clone(partition_expr), 
true);
+
+            // Try each sort option and pick the first one that works
+            let mut found = false;
+            for sort_expr in sort_options.into_iter() {
+                candidate_ordering.push(sort_expr);
+                if let Some(lex) = 
LexOrdering::new(candidate_ordering.clone()) {
+                    if window_eq_properties.ordering_satisfy(lex)? {
+                        found = true;
+                        break;
+                    }
+                }
+                // This option didn't work, remove it and try the next one
+                candidate_ordering.pop();
+            }
+            // If no sort option works for this column, we can't build a valid 
ordering
+            if !found {
+                candidate_ordering.clear();
+                break;
+            }
+        }
+
+        // If we successfully built an ordering for all columns, use it
+        // When there are no partition expressions, candidate_ordering will be 
empty and won't be added
+        if candidate_ordering.len() == partitioning_exprs.len() {
+            if let Some(lex) = LexOrdering::new(candidate_ordering) {
                 all_satisfied_lexs.push(lex);
             }
         }
@@ -410,8 +433,10 @@ pub(crate) fn window_equivalence_properties(
                     // Window function results in a partial constant value in
                     // some ordering. Adjust the ordering equivalences 
accordingly:
                     let new_lexs = 
all_satisfied_lexs.into_iter().flat_map(|lex| {
-                        let new_partial_consts =
-                            
sort_options_resolving_constant(Arc::clone(&window_col));
+                        let new_partial_consts = 
sort_options_resolving_constant(
+                            Arc::clone(&window_col),
+                            false,
+                        );
 
                         new_partial_consts.into_iter().map(move |partial| {
                             let mut existing = lex.clone();
@@ -467,23 +492,52 @@ pub(crate) fn window_equivalence_properties(
                 // utilize set-monotonicity since the set shrinks as the frame
                 // boundary starts "touching" the end of the table.
                 else if frame.is_causal() {
-                    let args_all_lexs = sliding_expr
-                        .get_aggregate_expr()
-                        .expressions()
-                        .into_iter()
-                        .map(sort_options_resolving_constant)
-                        .multi_cartesian_product();
-
-                    let (mut asc, mut satisfied) = (false, false);
-                    for order in args_all_lexs {
-                        if let Some(f) = order.first() {
-                            asc = !f.options.descending;
+                    // Find one valid ordering for aggregate arguments instead 
of
+                    // checking all combinations
+                    let aggregate_exprs = 
sliding_expr.get_aggregate_expr().expressions();
+                    let mut candidate_order = vec![];
+                    let mut asc = false;
+
+                    for (idx, expr) in aggregate_exprs.iter().enumerate() {
+                        let mut found = false;
+                        let sort_options =
+                            sort_options_resolving_constant(Arc::clone(expr), 
false);
+
+                        // Try each option and pick the first that works
+                        for sort_expr in sort_options.into_iter() {
+                            let is_asc = !sort_expr.options.descending;
+                            candidate_order.push(sort_expr);
+
+                            if let Some(lex) = 
LexOrdering::new(candidate_order.clone()) {
+                                if window_eq_properties.ordering_satisfy(lex)? 
{
+                                    if idx == 0 {
+                                        // The first column's ordering 
direction determines the overall
+                                        // monotonicity behavior of the window 
result.
+                                        // - If the aggregate has increasing 
set monotonicity (e.g., MAX, COUNT)
+                                        //   and the first arg is ascending, 
the window result is increasing
+                                        // - If the aggregate has decreasing 
set monotonicity (e.g., MIN)
+                                        //   and the first arg is ascending, 
the window result is also increasing
+                                        // This flag is used to determine the 
final window column ordering.
+                                        asc = is_asc;
+                                    }
+                                    found = true;
+                                    break;
+                                }
+                            }
+                            // This option didn't work, remove it and try the 
next one
+                            candidate_order.pop();
                         }
-                        if window_eq_properties.ordering_satisfy(order)? {
-                            satisfied = true;
+
+                        // If we couldn't extend the ordering, stop trying
+                        if !found {
                             break;
                         }
                     }
+
+                    // Check if we successfully built a complete ordering
+                    let satisfied = candidate_order.len() == 
aggregate_exprs.len()
+                        && !aggregate_exprs.is_empty();
+
                     if satisfied {
                         let increasing =
                             set_monotonicity.eq(&SetMonotonicity::Increasing);
@@ -634,11 +688,45 @@ pub fn get_window_mode(
     Ok(None)
 }
 
-fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) -> 
Vec<PhysicalSortExpr> {
-    vec![
-        PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, 
false)),
-        PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
-    ]
+/// Generates sort option variations for a given expression.
+///
+/// This function is used to handle constant columns in window operations. 
Since constant
+/// columns can be considered as having any ordering, we generate multiple 
sort options
+/// to explore different ordering possibilities.
+///
+/// # Parameters
+/// - `expr`: The physical expression to generate sort options for
+/// - `only_monotonic`: If false, generates all 4 possible sort options 
(ASC/DESC × NULLS FIRST/LAST).
+///   If true, generates only 2 options that preserve set monotonicity.
+///
+/// # When to use `only_monotonic = false`:
+/// Use for PARTITION BY columns where we want to explore all possible 
orderings to find
+/// one that matches the existing data ordering.
+///
+/// # When to use `only_monotonic = true`:
+/// Use for aggregate/window function arguments where set monotonicity needs 
to be preserved.
+/// Only generates ASC NULLS LAST and DESC NULLS FIRST because:
+/// - Set monotonicity is broken if data has increasing order but nulls come 
first
+/// - Set monotonicity is broken if data has decreasing order but nulls come 
last
+fn sort_options_resolving_constant(
+    expr: Arc<dyn PhysicalExpr>,
+    only_monotonic: bool,
+) -> Vec<PhysicalSortExpr> {
+    if only_monotonic {
+        // Generate only the 2 options that preserve set monotonicity
+        vec![
+            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, 
false)), // ASC NULLS LAST
+            PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC 
NULLS FIRST
+        ]
+    } else {
+        // Generate all 4 possible sort options for partition columns
+        vec![
+            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, 
false)), // ASC NULLS LAST
+            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false, 
true)), // ASC NULLS FIRST
+            PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true, 
false)), // DESC NULLS LAST
+            PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC 
NULLS FIRST
+        ]
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index c302582344..e81662a753 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -6034,3 +6034,92 @@ LIMIT 5
 0 2 NULL NULL 0 NULL NULL
 0 3 NULL NULL 0 NULL NULL
 0 4 NULL NULL 0 NULL NULL
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+    SELECT
+        1 AS n,
+        '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, 
'' AS a8,
+        '' AS a9, '' AS a10, '' AS a11, '' AS a12
+)
+SELECT
+    sum(n) OVER (PARTITION BY
+        a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12
+    )
+FROM source;
+----
+1
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+    SELECT
+        1 AS n,
+        '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, 
'' AS a8,
+        '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS 
a15, '' AS a16,
+        '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' 
AS a23, '' AS a24,
+        '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' 
AS a31, '' AS a32,
+        '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' 
AS a39, '' AS a40
+)
+SELECT
+    sum(n) OVER (PARTITION BY
+        a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, 
a17, a18, a19, a20,
+        a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, 
a35, a36, a37, a38, a39, a40
+    )
+FROM source;
+----
+1
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+    SELECT
+        1 AS n,
+        '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, 
'' AS a8,
+        '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS 
a15, '' AS a16,
+        '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' 
AS a23, '' AS a24,
+        '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' 
AS a31, '' AS a32,
+        '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' 
AS a39, '' AS a40
+)
+SELECT
+    sum(n) OVER (PARTITION BY
+        a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, 
a17, a18, a19, a20,
+        a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, 
a35, a36, a37, a38, a39, a40
+    )
+FROM (
+    SELECT * FROM source
+    ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, 
a16, a17, a18, a19, a20,
+        a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, 
a35, a36, a37, a38, a39, a40
+);
+----
+1
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+    SELECT
+        1 AS n,
+        '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7, 
'' AS a8,
+        '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS 
a15, '' AS a16,
+        '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, '' 
AS a23, '' AS a24,
+        '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, '' 
AS a31, '' AS a32,
+        '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, '' 
AS a39, '' AS a40
+)
+SELECT
+    sum(n) OVER (PARTITION BY
+        a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16, 
a17, a18, a19, a20,
+        a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, 
a35, a36, a37, a38, a39, a40
+    )
+FROM (
+    SELECT * FROM source
+    WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = '' 
AND a7 = '' AND a8 = ''
+        AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND 
a14 = '' AND a15 = '' AND a16 = ''
+        AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND 
a22 = '' AND a23 = '' AND a24 = ''
+        AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND 
a30 = '' AND a31 = '' AND a32 = ''
+        AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND 
a38 = '' AND a39 = '' AND a40 = ''
+    ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, 
a16, a17, a18, a19, a20,
+        a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34, 
a35, a36, a37, a38, a39, a40
+);
+----
+1
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to