This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new c1d6f34316 Prevent exponential planning time for Window functions - v2
(#17684)
c1d6f34316 is described below
commit c1d6f343161ad7a3a9a372469fafd529d0085ada
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Sep 25 21:34:24 2025 +0300
Prevent exponential planning time for Window functions - v2 (#17684)
* fix
* Update mod.rs
* Update mod.rs
* Update mod.rs
* tests copied from v1 pr
* test case from review comment
https://github.com/apache/datafusion/pull/17684#discussion_r2366146307
* one more test case
* Update mod.rs
* Update datafusion/physical-plan/src/windows/mod.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update datafusion/physical-plan/src/windows/mod.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Update mod.rs
* Update mod.rs
---------
Co-authored-by: Piotr Findeisen <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/benches/sql_planner.rs | 3 +-
datafusion/physical-plan/src/windows/mod.rs | 148 ++++++++++++++++++++------
datafusion/sqllogictest/test_files/window.slt | 89 ++++++++++++++++
3 files changed, 209 insertions(+), 31 deletions(-)
diff --git a/datafusion/core/benches/sql_planner.rs
b/datafusion/core/benches/sql_planner.rs
index c71191507f..9ae6e1f570 100644
--- a/datafusion/core/benches/sql_planner.rs
+++ b/datafusion/core/benches/sql_planner.rs
@@ -476,7 +476,8 @@ fn criterion_benchmark(c: &mut Criterion) {
});
});
- for partitioning_columns in [4, 7, 8] {
+ // It was observed in production that queries with window functions
sometimes partition over more than 30 columns
+ for partitioning_columns in [4, 7, 8, 12, 30] {
c.bench_function(
&format!(
"physical_window_function_partition_by_{partitioning_columns}_on_values"
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 4b4e018fd5..829cadf50a 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -371,17 +371,40 @@ pub(crate) fn window_equivalence_properties(
for (i, expr) in window_exprs.iter().enumerate() {
let partitioning_exprs = expr.partition_by();
let no_partitioning = partitioning_exprs.is_empty();
- // Collect columns defining partitioning, and construct all
`SortOptions`
- // variations for them. Then, we will check each one whether it
satisfies
- // the existing ordering provided by the input plan.
+
+ // Find "one" valid ordering for partition columns to avoid
exponential complexity.
+ // see https://github.com/apache/datafusion/issues/17401
let mut all_satisfied_lexs = vec![];
- for lex in partitioning_exprs
- .iter()
- .map(|pb_order|
sort_options_resolving_constant(Arc::clone(pb_order)))
- .multi_cartesian_product()
- .filter_map(LexOrdering::new)
- {
- if window_eq_properties.ordering_satisfy(lex.clone())? {
+ let mut candidate_ordering = vec![];
+
+ for partition_expr in partitioning_exprs.iter() {
+ let sort_options =
+ sort_options_resolving_constant(Arc::clone(partition_expr),
true);
+
+ // Try each sort option and pick the first one that works
+ let mut found = false;
+ for sort_expr in sort_options.into_iter() {
+ candidate_ordering.push(sort_expr);
+ if let Some(lex) =
LexOrdering::new(candidate_ordering.clone()) {
+ if window_eq_properties.ordering_satisfy(lex)? {
+ found = true;
+ break;
+ }
+ }
+ // This option didn't work, remove it and try the next one
+ candidate_ordering.pop();
+ }
+ // If no sort option works for this column, we can't build a valid
ordering
+ if !found {
+ candidate_ordering.clear();
+ break;
+ }
+ }
+
+ // If we successfully built an ordering for all columns, use it
+ // When there are no partition expressions, candidate_ordering will be
empty and won't be added
+ if candidate_ordering.len() == partitioning_exprs.len() {
+ if let Some(lex) = LexOrdering::new(candidate_ordering) {
all_satisfied_lexs.push(lex);
}
}
@@ -410,8 +433,10 @@ pub(crate) fn window_equivalence_properties(
// Window function results in a partial constant value in
// some ordering. Adjust the ordering equivalences
accordingly:
let new_lexs =
all_satisfied_lexs.into_iter().flat_map(|lex| {
- let new_partial_consts =
-
sort_options_resolving_constant(Arc::clone(&window_col));
+ let new_partial_consts =
sort_options_resolving_constant(
+ Arc::clone(&window_col),
+ false,
+ );
new_partial_consts.into_iter().map(move |partial| {
let mut existing = lex.clone();
@@ -467,23 +492,52 @@ pub(crate) fn window_equivalence_properties(
// utilize set-monotonicity since the set shrinks as the frame
// boundary starts "touching" the end of the table.
else if frame.is_causal() {
- let args_all_lexs = sliding_expr
- .get_aggregate_expr()
- .expressions()
- .into_iter()
- .map(sort_options_resolving_constant)
- .multi_cartesian_product();
-
- let (mut asc, mut satisfied) = (false, false);
- for order in args_all_lexs {
- if let Some(f) = order.first() {
- asc = !f.options.descending;
+ // Find one valid ordering for aggregate arguments instead
of
+ // checking all combinations
+ let aggregate_exprs =
sliding_expr.get_aggregate_expr().expressions();
+ let mut candidate_order = vec![];
+ let mut asc = false;
+
+ for (idx, expr) in aggregate_exprs.iter().enumerate() {
+ let mut found = false;
+ let sort_options =
+ sort_options_resolving_constant(Arc::clone(expr),
false);
+
+ // Try each option and pick the first that works
+ for sort_expr in sort_options.into_iter() {
+ let is_asc = !sort_expr.options.descending;
+ candidate_order.push(sort_expr);
+
+ if let Some(lex) =
LexOrdering::new(candidate_order.clone()) {
+ if window_eq_properties.ordering_satisfy(lex)?
{
+ if idx == 0 {
+ // The first column's ordering
direction determines the overall
+ // monotonicity behavior of the window
result.
+ // - If the aggregate has increasing
set monotonicity (e.g., MAX, COUNT)
+ // and the first arg is ascending,
the window result is increasing
+ // - If the aggregate has decreasing
set monotonicity (e.g., MIN)
+ // and the first arg is ascending,
the window result is also increasing
+ // This flag is used to determine the
final window column ordering.
+ asc = is_asc;
+ }
+ found = true;
+ break;
+ }
+ }
+ // This option didn't work, remove it and try the
next one
+ candidate_order.pop();
}
- if window_eq_properties.ordering_satisfy(order)? {
- satisfied = true;
+
+ // If we couldn't extend the ordering, stop trying
+ if !found {
break;
}
}
+
+ // Check if we successfully built a complete ordering
+ let satisfied = candidate_order.len() ==
aggregate_exprs.len()
+ && !aggregate_exprs.is_empty();
+
if satisfied {
let increasing =
set_monotonicity.eq(&SetMonotonicity::Increasing);
@@ -634,11 +688,45 @@ pub fn get_window_mode(
Ok(None)
}
-fn sort_options_resolving_constant(expr: Arc<dyn PhysicalExpr>) ->
Vec<PhysicalSortExpr> {
- vec![
- PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false,
false)),
- PhysicalSortExpr::new(expr, SortOptions::new(true, true)),
- ]
+/// Generates sort option variations for a given expression.
+///
+/// This function is used to handle constant columns in window operations.
Since constant
+/// columns can be considered as having any ordering, we generate multiple
sort options
+/// to explore different ordering possibilities.
+///
+/// # Parameters
+/// - `expr`: The physical expression to generate sort options for
+/// - `only_monotonic`: If false, generates all 4 possible sort options
(ASC/DESC × NULLS FIRST/LAST).
+/// If true, generates only 2 options that preserve set monotonicity.
+///
+/// # When to use `only_monotonic = false`:
+/// Use for PARTITION BY columns where we want to explore all possible
orderings to find
+/// one that matches the existing data ordering.
+///
+/// # When to use `only_monotonic = true`:
+/// Use for aggregate/window function arguments where set monotonicity needs
to be preserved.
+/// Only generates ASC NULLS LAST and DESC NULLS FIRST because:
+/// - Set monotonicity is broken if data has increasing order but nulls come
first
+/// - Set monotonicity is broken if data has decreasing order but nulls come
last
+fn sort_options_resolving_constant(
+ expr: Arc<dyn PhysicalExpr>,
+ only_monotonic: bool,
+) -> Vec<PhysicalSortExpr> {
+ if only_monotonic {
+ // Generate only the 2 options that preserve set monotonicity
+ vec![
+ PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false,
false)), // ASC NULLS LAST
+ PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC
NULLS FIRST
+ ]
+ } else {
+ // Generate all 4 possible sort options for partition columns
+ vec![
+ PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false,
false)), // ASC NULLS LAST
+ PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(false,
true)), // ASC NULLS FIRST
+ PhysicalSortExpr::new(Arc::clone(&expr), SortOptions::new(true,
false)), // DESC NULLS LAST
+ PhysicalSortExpr::new(expr, SortOptions::new(true, true)), // DESC
NULLS FIRST
+ ]
+ }
}
#[cfg(test)]
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index c302582344..e81662a753 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -6034,3 +6034,92 @@ LIMIT 5
0 2 NULL NULL 0 NULL NULL
0 3 NULL NULL 0 NULL NULL
0 4 NULL NULL 0 NULL NULL
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+ SELECT
+ 1 AS n,
+ '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7,
'' AS a8,
+ '' AS a9, '' AS a10, '' AS a11, '' AS a12
+)
+SELECT
+ sum(n) OVER (PARTITION BY
+ a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12
+ )
+FROM source;
+----
+1
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+ SELECT
+ 1 AS n,
+ '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7,
'' AS a8,
+ '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS
a15, '' AS a16,
+ '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, ''
AS a23, '' AS a24,
+ '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, ''
AS a31, '' AS a32,
+ '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, ''
AS a39, '' AS a40
+)
+SELECT
+ sum(n) OVER (PARTITION BY
+ a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16,
a17, a18, a19, a20,
+ a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34,
a35, a36, a37, a38, a39, a40
+ )
+FROM source;
+----
+1
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+ SELECT
+ 1 AS n,
+ '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7,
'' AS a8,
+ '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS
a15, '' AS a16,
+ '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, ''
AS a23, '' AS a24,
+ '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, ''
AS a31, '' AS a32,
+ '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, ''
AS a39, '' AS a40
+)
+SELECT
+ sum(n) OVER (PARTITION BY
+ a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16,
a17, a18, a19, a20,
+ a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34,
a35, a36, a37, a38, a39, a40
+ )
+FROM (
+ SELECT * FROM source
+ ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15,
a16, a17, a18, a19, a20,
+ a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34,
a35, a36, a37, a38, a39, a40
+);
+----
+1
+
+# regression test for https://github.com/apache/datafusion/issues/17401
+query I
+WITH source AS (
+ SELECT
+ 1 AS n,
+ '' AS a1, '' AS a2, '' AS a3, '' AS a4, '' AS a5, '' AS a6, '' AS a7,
'' AS a8,
+ '' AS a9, '' AS a10, '' AS a11, '' AS a12, '' AS a13, '' AS a14, '' AS
a15, '' AS a16,
+ '' AS a17, '' AS a18, '' AS a19, '' AS a20, '' AS a21, '' AS a22, ''
AS a23, '' AS a24,
+ '' AS a25, '' AS a26, '' AS a27, '' AS a28, '' AS a29, '' AS a30, ''
AS a31, '' AS a32,
+ '' AS a33, '' AS a34, '' AS a35, '' AS a36, '' AS a37, '' AS a38, ''
AS a39, '' AS a40
+)
+SELECT
+ sum(n) OVER (PARTITION BY
+ a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15, a16,
a17, a18, a19, a20,
+ a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34,
a35, a36, a37, a38, a39, a40
+ )
+FROM (
+ SELECT * FROM source
+ WHERE a1 = '' AND a2 = '' AND a3 = '' AND a4 = '' AND a5 = '' AND a6 = ''
AND a7 = '' AND a8 = ''
+ AND a9 = '' AND a10 = '' AND a11 = '' AND a12 = '' AND a13 = '' AND
a14 = '' AND a15 = '' AND a16 = ''
+ AND a17 = '' AND a18 = '' AND a19 = '' AND a20 = '' AND a21 = '' AND
a22 = '' AND a23 = '' AND a24 = ''
+ AND a25 = '' AND a26 = '' AND a27 = '' AND a28 = '' AND a29 = '' AND
a30 = '' AND a31 = '' AND a32 = ''
+ AND a33 = '' AND a34 = '' AND a35 = '' AND a36 = '' AND a37 = '' AND
a38 = '' AND a39 = '' AND a40 = ''
+ ORDER BY a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14, a15,
a16, a17, a18, a19, a20,
+ a21, a22, a23, a24, a25, a26, a27, a28, a29, a30, a31, a32, a33, a34,
a35, a36, a37, a38, a39, a40
+);
+----
+1
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]