This is an automated email from the ASF dual-hosted git repository.

avantgardner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 25acb64358 Push the limits (#17347)
25acb64358 is described below

commit 25acb643585fe4460199a8731fc94c24e79466ef
Author: Brent Gardner <bgard...@squarelabs.net>
AuthorDate: Fri Aug 29 11:32:46 2025 -0600

    Push the limits (#17347)
    
    Add physical optimizer rule to push limits past certain window functions 
(part 1).
---
 datafusion/common/src/config.rs                    |   4 +
 datafusion/common/src/tree_node.rs                 |   5 +
 datafusion/physical-optimizer/src/lib.rs           |   1 +
 .../src/limit_pushdown_past_window.rs              | 141 +++++++++++++++++++++
 datafusion/physical-optimizer/src/optimizer.rs     |   7 +-
 datafusion/sqllogictest/test_files/explain.slt     |   3 +
 .../sqllogictest/test_files/information_schema.slt |   2 +
 datafusion/sqllogictest/test_files/window.slt      | 114 ++++++++++++++++-
 docs/source/user-guide/configs.md                  |   1 +
 9 files changed, 272 insertions(+), 6 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index bc0c47d1c3..cdd8e72a06 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -727,6 +727,10 @@ config_namespace! {
         /// during aggregations, if possible
         pub enable_topk_aggregation: bool, default = true
 
+        /// When set to true, the optimizer will attempt to push limit 
operations
+        /// past window functions, if possible
+        pub enable_window_limits: bool, default = true
+
         /// When set to true attempts to push down dynamic filters generated 
by operators into the file scan phase.
         /// For example, for a query such as `SELECT * FROM t ORDER BY 
timestamp DESC LIMIT 10`, the optimizer
         /// will attempt to push down the current top 10 timestamps that the 
TopK operator references into the file scans.
diff --git a/datafusion/common/src/tree_node.rs 
b/datafusion/common/src/tree_node.rs
index cf51dadf6b..7af20faca2 100644
--- a/datafusion/common/src/tree_node.rs
+++ b/datafusion/common/src/tree_node.rs
@@ -680,6 +680,11 @@ impl<T> Transformed<T> {
         Self::new(data, true, TreeNodeRecursion::Continue)
     }
 
+    /// Wrapper for transformed data with [`TreeNodeRecursion::Stop`] 
statement.
+    pub fn complete(data: T) -> Self {
+        Self::new(data, true, TreeNodeRecursion::Stop)
+    }
+
     /// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] 
statement.
     pub fn no(data: T) -> Self {
         Self::new(data, false, TreeNodeRecursion::Continue)
diff --git a/datafusion/physical-optimizer/src/lib.rs 
b/datafusion/physical-optimizer/src/lib.rs
index c828cc6960..2e56e2cdb3 100644
--- a/datafusion/physical-optimizer/src/lib.rs
+++ b/datafusion/physical-optimizer/src/lib.rs
@@ -39,6 +39,7 @@ pub mod optimizer;
 pub mod output_requirements;
 pub mod projection_pushdown;
 pub use datafusion_pruning as pruning;
+mod limit_pushdown_past_window;
 pub mod sanity_checker;
 pub mod topk_aggregation;
 pub mod update_aggr_exprs;
diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs 
b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
new file mode 100644
index 0000000000..e2e5a839ef
--- /dev/null
+++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::PhysicalOptimizerRule;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::ScalarValue;
+use datafusion_expr::{WindowFrameBound, WindowFrameUnits};
+use datafusion_physical_plan::execution_plan::CardinalityEffect;
+use datafusion_physical_plan::limit::GlobalLimitExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::windows::BoundedWindowAggExec;
+use datafusion_physical_plan::ExecutionPlan;
+use std::cmp;
+use std::sync::Arc;
+
+/// This rule inspects [`ExecutionPlan`]'s attempting to find fetch limits 
that were not pushed
+/// down by `LimitPushdown` because [BoundedWindowAggExec]s were "in the way". 
If the window is
+/// bounded by [WindowFrameUnits::Rows] then we calculate the adjustment 
needed to grow the limit
+/// and continue pushdown.
+#[derive(Default, Clone, Debug)]
+pub struct LimitPushPastWindows;
+
+impl LimitPushPastWindows {
+    pub fn new() -> Self {
+        Self
+    }
+}
+
+impl PhysicalOptimizerRule for LimitPushPastWindows {
+    fn optimize(
+        &self,
+        original: Arc<dyn ExecutionPlan>,
+        config: &ConfigOptions,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        if !config.optimizer.enable_window_limits {
+            return Ok(original);
+        }
+        let mut latest_limit: Option<usize> = None;
+        let mut latest_max = 0;
+        let result = original.transform_down(|node| {
+            // helper closure to DRY out most the early return cases
+            let mut reset = |node,
+                             max: &mut usize|
+             -> datafusion_common::Result<
+                Transformed<Arc<dyn ExecutionPlan>>,
+            > {
+                latest_limit = None;
+                *max = 0;
+                Ok(Transformed::no(node))
+            };
+
+            // traversing sides of joins will require more thought
+            if node.children().len() > 1 {
+                return reset(node, &mut latest_max);
+            }
+
+            // grab the latest limit we see
+            if let Some(limit) = 
node.as_any().downcast_ref::<GlobalLimitExec>() {
+                latest_limit = limit.fetch().map(|fetch| fetch + limit.skip());
+                latest_max = 0;
+                return Ok(Transformed::no(node));
+            }
+
+            // grow the limit if we hit a window function
+            if let Some(window) = 
node.as_any().downcast_ref::<BoundedWindowAggExec>() {
+                for expr in window.window_expr().iter() {
+                    let frame = expr.get_window_frame();
+                    if frame.units != WindowFrameUnits::Rows {
+                        return reset(node, &mut latest_max); // 
expression-based limits?
+                    }
+                    let Some(end_bound) = bound_to_usize(&frame.end_bound) 
else {
+                        return reset(node, &mut latest_max);
+                    };
+                    latest_max = cmp::max(end_bound, latest_max);
+                }
+                return Ok(Transformed::no(node));
+            }
+
+            // Apply the limit if we hit a sort node
+            if let Some(sort) = node.as_any().downcast_ref::<SortExec>() {
+                let latest = latest_limit.take();
+                let Some(fetch) = latest else {
+                    latest_max = 0;
+                    return Ok(Transformed::no(node));
+                };
+                let fetch = match sort.fetch() {
+                    None => fetch + latest_max,
+                    Some(existing) => cmp::min(existing, fetch + latest_max),
+                };
+                let sort: Arc<dyn ExecutionPlan> = 
Arc::new(sort.with_fetch(Some(fetch)));
+                latest_max = 0;
+                return Ok(Transformed::complete(sort));
+            }
+
+            // we can't push the limit past nodes that decrease row count
+            match node.cardinality_effect() {
+                CardinalityEffect::Equal => {}
+                _ => return reset(node, &mut latest_max),
+            }
+
+            Ok(Transformed::no(node))
+        })?;
+        Ok(result.data)
+    }
+
+    fn name(&self) -> &str {
+        "LimitPushPastWindows"
+    }
+
+    fn schema_check(&self) -> bool {
+        false // we don't change the schema
+    }
+}
+
+fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> {
+    match bound {
+        WindowFrameBound::Preceding(_) => Some(0),
+        WindowFrameBound::CurrentRow => Some(0),
+        WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => {
+            Some(*scalar as usize)
+        }
+        _ => None,
+    }
+}
+
+// tests: all branches are covered by sqllogictests
diff --git a/datafusion/physical-optimizer/src/optimizer.rs 
b/datafusion/physical-optimizer/src/optimizer.rs
index f9ad521b4f..4d00f1029d 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -37,6 +37,7 @@ use crate::topk_aggregation::TopKAggregation;
 use crate::update_aggr_exprs::OptimizeAggregateOrder;
 
 use crate::coalesce_async_exec_input::CoalesceAsyncExecInput;
+use crate::limit_pushdown_past_window::LimitPushPastWindows;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::Result;
 use datafusion_physical_plan::ExecutionPlan;
@@ -59,7 +60,7 @@ pub trait PhysicalOptimizerRule: Debug {
     /// A human readable name for this optimizer rule
     fn name(&self) -> &str;
 
-    /// A flag to indicate whether the physical planner should valid the rule 
will not
+    /// A flag to indicate whether the physical planner should validate that 
the rule will not
     /// change the schema of the plan after the rewriting.
     /// Some of the optimization rules might change the nullable properties of 
the schema
     /// and should disable the schema check.
@@ -131,6 +132,10 @@ impl PhysicalOptimizer {
             // into an `order by max(x) limit y`. In this case it will copy 
the limit value down
             // to the aggregation, allowing it to use only y number of 
accumulators.
             Arc::new(TopKAggregation::new()),
+            // Tries to push limits down through window functions, growing as 
appropriate
+            // This can possibly be combined with [LimitPushdown]
+            // It needs to come after [EnforceSorting]
+            Arc::new(LimitPushPastWindows::new()),
             // The LimitPushdown rule tries to push limits down as far as 
possible,
             // replacing operators with fetching variants, or adding limits
             // past operators that support limit pushdown.
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index cccffe08ad..06965ebef0 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -241,6 +241,7 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE
 physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after EnsureCooperative SAME TEXT AS ABOVE
@@ -321,6 +322,7 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), 
Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after EnsureCooperative SAME TEXT AS ABOVE
@@ -365,6 +367,7 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
+physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after EnsureCooperative SAME TEXT AS ABOVE
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index c87abb972e..fb2c890201 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -289,6 +289,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit 
true
 datafusion.optimizer.enable_dynamic_filter_pushdown true
 datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.enable_topk_aggregation true
+datafusion.optimizer.enable_window_limits true
 datafusion.optimizer.expand_views_at_output false
 datafusion.optimizer.filter_null_join_keys false
 datafusion.optimizer.hash_join_single_partition_threshold 1048576
@@ -402,6 +403,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit 
true When set to tru
 datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true 
attempts to push down dynamic filters generated by operators into the file scan 
phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp 
DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 
timestamps that the TopK operator references into the file scans. This means 
that if we already have 10 timestamps in the year 2025 any files that only have 
timestamps in the year 2024 ca [...]
 datafusion.optimizer.enable_round_robin_repartition true When set to true, the 
physical plan optimizer will try to add round robin repartitioning to increase 
parallelism to leverage more CPU cores
 datafusion.optimizer.enable_topk_aggregation true When set to true, the 
optimizer will attempt to perform limit operations during aggregations, if 
possible
+datafusion.optimizer.enable_window_limits true When set to true, the optimizer 
will attempt to push limit operations past window functions, if possible
 datafusion.optimizer.expand_views_at_output false When set to true, if the 
returned type is a view type then the output will be coerced to a non-view. 
Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
 datafusion.optimizer.filter_null_join_keys false When set to true, the 
optimizer will insert filters before a join between a nullable and non-nullable 
column to filter out nulls on the nullable side. This filter can add additional 
overhead when the file format does not fully support predicate push down.
 datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum 
estimated size in bytes for one input side of a HashJoin will be collected into 
a single partition
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 60ce22062f..5475fd923b 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -1345,7 +1345,7 @@ physical_plan
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING], 
mode=[Sorted]
 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], 
mode=[Sorted]
-05)--------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false]
+05)--------SortExec: TopK(fetch=10), expr=[c9@0 DESC], 
preserve_partitioning=[false]
 06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
file_type=csv, has_header=true
 
 query III
@@ -1362,6 +1362,110 @@ SELECT
 4144173353 20935849039 28472563256
 4076864659 24997484146 28118515915
 
+# Only 1 SortExec was added, and limit 100 was turned into limit 10
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
+    FROM (
+        SELECT c9,
+        FROM aggregate_test_100
+        ORDER BY c9 DESC
+        LIMIT 100
+    )
+    LIMIT 5
+----
+logical_plan
+01)Projection: aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING 
AS sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+02)--Limit: skip=0, fetch=5
+03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+04)------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+05)--------Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=100
+06)----------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+01)ProjectionExec: expr=[c9@0 as c9, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@2 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]
+02)--GlobalLimitExec: skip=0, fetch=5
+03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING], 
mode=[Sorted]
+04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], 
mode=[Sorted]
+05)--------SortExec: TopK(fetch=10), expr=[c9@0 DESC], 
preserve_partitioning=[false]
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
file_type=csv, has_header=true
+
+# ensure limit pushdown can handle bigger preceding instead of following
+statement ok
+set datafusion.optimizer.enable_window_limits = false;
+
+query III
+SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as 
sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+4268716378 24997484146 8498370520
+4229654142 29012926487 12714811027
+4216440507 28743001064 16858984380
+4144173353 28472563256 20935849039
+4076864659 28118515915 24997484146
+
+statement ok
+set datafusion.optimizer.enable_window_limits = true;
+
+query III
+SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as 
sum2
+    FROM aggregate_test_100
+    LIMIT 5
+----
+4268716378 24997484146 8498370520
+4229654142 29012926487 12714811027
+4216440507 28743001064 16858984380
+4144173353 28472563256 20935849039
+4076864659 28118515915 24997484146
+
+# test_window_agg_sort_reversed_plan
+# Only 1 SortExec was added, limit & skip are pushed down
+query TT
+EXPLAIN SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
+    FROM aggregate_test_100
+    LIMIT 5
+    OFFSET 5
+----
+logical_plan
+01)Projection: aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING 
AS sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2
+02)--Limit: skip=5, fetch=5
+03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+04)------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING]]
+05)--------TableScan: aggregate_test_100 projection=[c9]
+physical_plan
+01)ProjectionExec: expr=[c9@0 as c9, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@2 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2]
+02)--GlobalLimitExec: skip=5, fetch=5
+03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING], 
mode=[Sorted]
+04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], 
mode=[Sorted]
+05)--------SortExec: TopK(fetch=15), expr=[c9@0 DESC], 
preserve_partitioning=[false]
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
file_type=csv, has_header=true
+
+query III
+SELECT
+    c9,
+    SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum1,
+    SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as 
sum2
+    FROM aggregate_test_100
+    LIMIT 5
+    OFFSET 5
+----
+4061635107 29012926487 27741341640
+4015442341 28743001064 27423817254
+3998790955 28472563256 27079733310
+3959216334 28118515915 26689577379
+3862393166 27741341640 26284746231
+
 # test_window_agg_sort_reversed_plan_builtin
 query TT
 EXPLAIN SELECT
@@ -1428,7 +1532,7 @@ physical_plan
 01)ProjectionExec: expr=[c9@0 as c9, row_number() ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@2 as rn1, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2]
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [aggregate_test_100.c9 
ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: 
"row_number() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, 
dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING], mode=[Sorted]
-04)------SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false]
+04)------SortExec: TopK(fetch=10), expr=[c9@0 ASC NULLS LAST], 
preserve_partitioning=[false]
 05)--------BoundedWindowAggExec: wdw=[row_number() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "row_number() ORDER BY [aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted]
 06)----------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false]
 07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], 
file_type=csv, has_header=true
@@ -1470,7 +1574,7 @@ physical_plan
 01)ProjectionExec: expr=[c9@2 as c9, sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING@5 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 
DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 
PRECEDING AND 5 FOLLOWING@3 as sum2, row_number() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECE [...]
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: 
Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC 
NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC 
NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: true, dict_id: 0, dict_is_o [...]
-04)------SortExec: expr=[c9@2 ASC NULLS LAST, c1@0 ASC NULLS LAST, c2@1 ASC 
NULLS LAST], preserve_partitioning=[false]
+04)------SortExec: TopK(fetch=10), expr=[c9@2 ASC NULLS LAST, c1@0 ASC NULLS 
LAST, c2@1 ASC NULLS LAST], preserve_partitioning=[false]
 05)--------BoundedWindowAggExec: wdw=[row_number() ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING: Field { name: "row_number() ORDER BY [aggregate_test_100.c9 DESC 
NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted]
 06)----------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: 
"sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, 
aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING [...]
 07)------------SortExec: expr=[c9@2 DESC, c1@0 DESC], 
preserve_partitioning=[false]
@@ -1639,7 +1743,7 @@ physical_plan
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 
PRECEDING AND 5 FOLLO [...]
 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY 
[aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] 
ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: 
"sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, 
aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 
FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 F [...]
-05)--------SortExec: expr=[c1@0 ASC NULLS LAST, c9@1 DESC], 
preserve_partitioning=[false]
+05)--------SortExec: TopK(fetch=10), expr=[c1@0 ASC NULLS LAST, c9@1 DESC], 
preserve_partitioning=[false]
 06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c9], file_type=csv, has_header=true
 
 
@@ -1683,7 +1787,7 @@ physical_plan
 02)--GlobalLimitExec: skip=0, fetch=5
 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS 
LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 5 
PRECEDING AND 1 FOLLOWING [...]
 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY 
[aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS 
BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) 
PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS 
FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: 
true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 
PRECEDING AND 5 FOL [...]
-05)--------SortExec: expr=[c1@0 ASC NULLS LAST, c9@1 DESC], 
preserve_partitioning=[false]
+05)--------SortExec: TopK(fetch=10), expr=[c1@0 ASC NULLS LAST, c9@1 DESC], 
preserve_partitioning=[false]
 06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, 
c9], file_type=csv, has_header=true
 
 query III
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 877a46ef47..5060bc3805 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -129,6 +129,7 @@ The following configuration settings are available:
 | datafusion.optimizer.enable_distinct_aggregation_soft_limit             | 
true                      | When set to true, the optimizer will push a limit 
operation into grouped aggregations which have no aggregate expressions, as a 
soft limit, emitting groups once the limit is reached, before all rows in the 
group are read.                                                                 
                                                                                
                       [...]
 | datafusion.optimizer.enable_round_robin_repartition                     | 
true                      | When set to true, the physical plan optimizer will 
try to add round robin repartitioning to increase parallelism to leverage more 
CPU cores                                                                       
                                                                                
                                                                                
                   [...]
 | datafusion.optimizer.enable_topk_aggregation                            | 
true                      | When set to true, the optimizer will attempt to 
perform limit operations during aggregations, if possible                       
                                                                                
                                                                                
                                                                                
                     [...]
+| datafusion.optimizer.enable_window_limits                               | 
true                      | When set to true, the optimizer will attempt to 
push limit operations past window functions, if possible                        
                                                                                
                                                                                
                                                                                
                     [...]
 | datafusion.optimizer.enable_dynamic_filter_pushdown                     | 
true                      | When set to true attempts to push down dynamic 
filters generated by operators into the file scan phase. For example, for a 
query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer 
will attempt to push down the current top 10 timestamps that the TopK operator 
references into the file scans. This means that if we already have 10 
timestamps in the year 2025 any file [...]
 | datafusion.optimizer.filter_null_join_keys                              | 
false                     | When set to true, the optimizer will insert filters 
before a join between a nullable and non-nullable column to filter out nulls on 
the nullable side. This filter can add additional overhead when the file format 
does not fully support predicate push down.                                     
                                                                                
                 [...]
 | datafusion.optimizer.repartition_aggregations                           | 
true                      | Should DataFusion repartition data using the 
aggregate keys to execute aggregates in parallel using the provided 
`target_partitions` level                                                       
                                                                                
                                                                                
                                    [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to