This is an automated email from the ASF dual-hosted git repository. avantgardner pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 25acb64358 Push the limits (#17347) 25acb64358 is described below commit 25acb643585fe4460199a8731fc94c24e79466ef Author: Brent Gardner <bgard...@squarelabs.net> AuthorDate: Fri Aug 29 11:32:46 2025 -0600 Push the limits (#17347) Add physical optimizer rule to push limits past certain window functions (part 1). --- datafusion/common/src/config.rs | 4 + datafusion/common/src/tree_node.rs | 5 + datafusion/physical-optimizer/src/lib.rs | 1 + .../src/limit_pushdown_past_window.rs | 141 +++++++++++++++++++++ datafusion/physical-optimizer/src/optimizer.rs | 7 +- datafusion/sqllogictest/test_files/explain.slt | 3 + .../sqllogictest/test_files/information_schema.slt | 2 + datafusion/sqllogictest/test_files/window.slt | 114 ++++++++++++++++- docs/source/user-guide/configs.md | 1 + 9 files changed, 272 insertions(+), 6 deletions(-) diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index bc0c47d1c3..cdd8e72a06 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -727,6 +727,10 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true, the optimizer will attempt to push limit operations + /// past window functions, if possible + pub enable_window_limits: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. diff --git a/datafusion/common/src/tree_node.rs b/datafusion/common/src/tree_node.rs index cf51dadf6b..7af20faca2 100644 --- a/datafusion/common/src/tree_node.rs +++ b/datafusion/common/src/tree_node.rs @@ -680,6 +680,11 @@ impl<T> Transformed<T> { Self::new(data, true, TreeNodeRecursion::Continue) } + /// Wrapper for transformed data with [`TreeNodeRecursion::Stop`] statement. + pub fn complete(data: T) -> Self { + Self::new(data, true, TreeNodeRecursion::Stop) + } + /// Wrapper for unchanged data with [`TreeNodeRecursion::Continue`] statement. pub fn no(data: T) -> Self { Self::new(data, false, TreeNodeRecursion::Continue) diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index c828cc6960..2e56e2cdb3 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -39,6 +39,7 @@ pub mod optimizer; pub mod output_requirements; pub mod projection_pushdown; pub use datafusion_pruning as pruning; +mod limit_pushdown_past_window; pub mod sanity_checker; pub mod topk_aggregation; pub mod update_aggr_exprs; diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs new file mode 100644 index 0000000000..e2e5a839ef --- /dev/null +++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::PhysicalOptimizerRule; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::ScalarValue; +use datafusion_expr::{WindowFrameBound, WindowFrameUnits}; +use datafusion_physical_plan::execution_plan::CardinalityEffect; +use datafusion_physical_plan::limit::GlobalLimitExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::windows::BoundedWindowAggExec; +use datafusion_physical_plan::ExecutionPlan; +use std::cmp; +use std::sync::Arc; + +/// This rule inspects [`ExecutionPlan`]'s attempting to find fetch limits that were not pushed +/// down by `LimitPushdown` because [BoundedWindowAggExec]s were "in the way". If the window is +/// bounded by [WindowFrameUnits::Rows] then we calculate the adjustment needed to grow the limit +/// and continue pushdown. +#[derive(Default, Clone, Debug)] +pub struct LimitPushPastWindows; + +impl LimitPushPastWindows { + pub fn new() -> Self { + Self + } +} + +impl PhysicalOptimizerRule for LimitPushPastWindows { + fn optimize( + &self, + original: Arc<dyn ExecutionPlan>, + config: &ConfigOptions, + ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> { + if !config.optimizer.enable_window_limits { + return Ok(original); + } + let mut latest_limit: Option<usize> = None; + let mut latest_max = 0; + let result = original.transform_down(|node| { + // helper closure to DRY out most the early return cases + let mut reset = |node, + max: &mut usize| + -> datafusion_common::Result< + Transformed<Arc<dyn ExecutionPlan>>, + > { + latest_limit = None; + *max = 0; + Ok(Transformed::no(node)) + }; + + // traversing sides of joins will require more thought + if node.children().len() > 1 { + return reset(node, &mut latest_max); + } + + // grab the latest limit we see + if let Some(limit) = node.as_any().downcast_ref::<GlobalLimitExec>() { + latest_limit = limit.fetch().map(|fetch| fetch + limit.skip()); + latest_max = 0; + return Ok(Transformed::no(node)); + } + + // grow the limit if we hit a window function + if let Some(window) = node.as_any().downcast_ref::<BoundedWindowAggExec>() { + for expr in window.window_expr().iter() { + let frame = expr.get_window_frame(); + if frame.units != WindowFrameUnits::Rows { + return reset(node, &mut latest_max); // expression-based limits? + } + let Some(end_bound) = bound_to_usize(&frame.end_bound) else { + return reset(node, &mut latest_max); + }; + latest_max = cmp::max(end_bound, latest_max); + } + return Ok(Transformed::no(node)); + } + + // Apply the limit if we hit a sort node + if let Some(sort) = node.as_any().downcast_ref::<SortExec>() { + let latest = latest_limit.take(); + let Some(fetch) = latest else { + latest_max = 0; + return Ok(Transformed::no(node)); + }; + let fetch = match sort.fetch() { + None => fetch + latest_max, + Some(existing) => cmp::min(existing, fetch + latest_max), + }; + let sort: Arc<dyn ExecutionPlan> = Arc::new(sort.with_fetch(Some(fetch))); + latest_max = 0; + return Ok(Transformed::complete(sort)); + } + + // we can't push the limit past nodes that decrease row count + match node.cardinality_effect() { + CardinalityEffect::Equal => {} + _ => return reset(node, &mut latest_max), + } + + Ok(Transformed::no(node)) + })?; + Ok(result.data) + } + + fn name(&self) -> &str { + "LimitPushPastWindows" + } + + fn schema_check(&self) -> bool { + false // we don't change the schema + } +} + +fn bound_to_usize(bound: &WindowFrameBound) -> Option<usize> { + match bound { + WindowFrameBound::Preceding(_) => Some(0), + WindowFrameBound::CurrentRow => Some(0), + WindowFrameBound::Following(ScalarValue::UInt64(Some(scalar))) => { + Some(*scalar as usize) + } + _ => None, + } +} + +// tests: all branches are covered by sqllogictests diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index f9ad521b4f..4d00f1029d 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -37,6 +37,7 @@ use crate::topk_aggregation::TopKAggregation; use crate::update_aggr_exprs::OptimizeAggregateOrder; use crate::coalesce_async_exec_input::CoalesceAsyncExecInput; +use crate::limit_pushdown_past_window::LimitPushPastWindows; use datafusion_common::config::ConfigOptions; use datafusion_common::Result; use datafusion_physical_plan::ExecutionPlan; @@ -59,7 +60,7 @@ pub trait PhysicalOptimizerRule: Debug { /// A human readable name for this optimizer rule fn name(&self) -> &str; - /// A flag to indicate whether the physical planner should valid the rule will not + /// A flag to indicate whether the physical planner should validate that the rule will not /// change the schema of the plan after the rewriting. /// Some of the optimization rules might change the nullable properties of the schema /// and should disable the schema check. @@ -131,6 +132,10 @@ impl PhysicalOptimizer { // into an `order by max(x) limit y`. In this case it will copy the limit value down // to the aggregation, allowing it to use only y number of accumulators. Arc::new(TopKAggregation::new()), + // Tries to push limits down through window functions, growing as appropriate + // This can possibly be combined with [LimitPushdown] + // It needs to come after [EnforceSorting] + Arc::new(LimitPushPastWindows::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index cccffe08ad..06965ebef0 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -241,6 +241,7 @@ physical_plan after coalesce_batches SAME TEXT AS ABOVE physical_plan after coalesce_async_exec_input SAME TEXT AS ABOVE physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], file_type=csv, has_header=true physical_plan after LimitAggregation SAME TEXT AS ABOVE +physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE @@ -321,6 +322,7 @@ physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after LimitAggregation SAME TEXT AS ABOVE +physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE @@ -365,6 +367,7 @@ physical_plan after OutputRequirements 01)GlobalLimitExec: skip=0, fetch=10 02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after LimitAggregation SAME TEXT AS ABOVE +physical_plan after LimitPushPastWindows SAME TEXT AS ABOVE physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, file_type=parquet physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after EnsureCooperative SAME TEXT AS ABOVE diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index c87abb972e..fb2c890201 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -289,6 +289,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_dynamic_filter_pushdown true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true +datafusion.optimizer.enable_window_limits true datafusion.optimizer.expand_views_at_output false datafusion.optimizer.filter_null_join_keys false datafusion.optimizer.hash_join_single_partition_threshold 1048576 @@ -402,6 +403,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 ca [...] datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible +datafusion.optimizer.enable_window_limits true When set to true, the optimizer will attempt to push limit operations past window functions, if possible datafusion.optimizer.expand_views_at_output false When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. datafusion.optimizer.filter_null_join_keys false When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. datafusion.optimizer.hash_join_single_partition_threshold 1048576 The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 60ce22062f..5475fd923b 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -1345,7 +1345,7 @@ physical_plan 02)--GlobalLimitExec: skip=0, fetch=5 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING], mode=[Sorted] 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -05)--------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false] +05)--------SortExec: TopK(fetch=10), expr=[c9@0 DESC], preserve_partitioning=[false] 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], file_type=csv, has_header=true query III @@ -1362,6 +1362,110 @@ SELECT 4144173353 20935849039 28472563256 4076864659 24997484146 28118515915 +# Only 1 SortExec was added, and limit 100 was turned into limit 10 +query TT +EXPLAIN SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1, + SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 + FROM ( + SELECT c9, + FROM aggregate_test_100 + ORDER BY c9 DESC + LIMIT 100 + ) + LIMIT 5 +---- +logical_plan +01)Projection: aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2 +02)--Limit: skip=0, fetch=5 +03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]] +04)------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]] +05)--------Sort: aggregate_test_100.c9 DESC NULLS FIRST, fetch=100 +06)----------TableScan: aggregate_test_100 projection=[c9] +physical_plan +01)ProjectionExec: expr=[c9@0 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2] +02)--GlobalLimitExec: skip=0, fetch=5 +03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING], mode=[Sorted] +04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted] +05)--------SortExec: TopK(fetch=10), expr=[c9@0 DESC], preserve_partitioning=[false] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], file_type=csv, has_header=true + +# ensure limit pushdown can handle bigger preceding instead of following +statement ok +set datafusion.optimizer.enable_window_limits = false; + +query III +SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum1, + SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2 + FROM aggregate_test_100 + LIMIT 5 +---- +4268716378 24997484146 8498370520 +4229654142 29012926487 12714811027 +4216440507 28743001064 16858984380 +4144173353 28472563256 20935849039 +4076864659 28118515915 24997484146 + +statement ok +set datafusion.optimizer.enable_window_limits = true; + +query III +SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum1, + SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING) as sum2 + FROM aggregate_test_100 + LIMIT 5 +---- +4268716378 24997484146 8498370520 +4229654142 29012926487 12714811027 +4216440507 28743001064 16858984380 +4144173353 28472563256 20935849039 +4076864659 28118515915 24997484146 + +# test_window_agg_sort_reversed_plan +# Only 1 SortExec was added, limit & skip are pushed down +query TT +EXPLAIN SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1, + SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 + FROM aggregate_test_100 + LIMIT 5 + OFFSET 5 +---- +logical_plan +01)Projection: aggregate_test_100.c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING AS sum2 +02)--Limit: skip=5, fetch=5 +03)----WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]] +04)------WindowAggr: windowExpr=[[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING]] +05)--------TableScan: aggregate_test_100 projection=[c9] +physical_plan +01)ProjectionExec: expr=[c9@0 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as sum2] +02)--GlobalLimitExec: skip=5, fetch=5 +03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING], mode=[Sorted] +04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted] +05)--------SortExec: TopK(fetch=15), expr=[c9@0 DESC], preserve_partitioning=[false] +06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], file_type=csv, has_header=true + +query III +SELECT + c9, + SUM(c9) OVER(ORDER BY c9 ASC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum1, + SUM(c9) OVER(ORDER BY c9 DESC ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING) as sum2 + FROM aggregate_test_100 + LIMIT 5 + OFFSET 5 +---- +4061635107 29012926487 27741341640 +4015442341 28743001064 27423817254 +3998790955 28472563256 27079733310 +3959216334 28118515915 26689577379 +3862393166 27741341640 26284746231 + # test_window_agg_sort_reversed_plan_builtin query TT EXPLAIN SELECT @@ -1428,7 +1532,7 @@ physical_plan 01)ProjectionExec: expr=[c9@0 as c9, row_number() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@2 as rn1, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@1 as rn2] 02)--GlobalLimitExec: skip=0, fetch=5 03)----BoundedWindowAggExec: wdw=[row_number() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "row_number() ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted] -04)------SortExec: expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false] +04)------SortExec: TopK(fetch=10), expr=[c9@0 ASC NULLS LAST], preserve_partitioning=[false] 05)--------BoundedWindowAggExec: wdw=[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted] 06)----------SortExec: expr=[c9@0 DESC], preserve_partitioning=[false] 07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c9], file_type=csv, has_header=true @@ -1470,7 +1574,7 @@ physical_plan 01)ProjectionExec: expr=[c9@2 as c9, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@5 as sum1, sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING@3 as sum2, row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECE [...] 02)--GlobalLimitExec: skip=0, fetch=5 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST, aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c2 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_o [...] -04)------SortExec: expr=[c9@2 ASC NULLS LAST, c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST], preserve_partitioning=[false] +04)------SortExec: TopK(fetch=10), expr=[c9@2 ASC NULLS LAST, c1@0 ASC NULLS LAST, c2@1 ASC NULLS LAST], preserve_partitioning=[false] 05)--------BoundedWindowAggExec: wdw=[row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "row_number() ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING], mode=[Sorted] 06)----------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST, aggregate_test_100.c1 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING [...] 07)------------SortExec: expr=[c9@2 DESC, c1@0 DESC], preserve_partitioning=[false] @@ -1639,7 +1743,7 @@ physical_plan 02)--GlobalLimitExec: skip=0, fetch=5 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOLLO [...] 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 F [...] -05)--------SortExec: expr=[c1@0 ASC NULLS LAST, c9@1 DESC], preserve_partitioning=[false] +05)--------SortExec: TopK(fetch=10), expr=[c1@0 ASC NULLS LAST, c9@1 DESC], preserve_partitioning=[false] 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], file_type=csv, has_header=true @@ -1683,7 +1787,7 @@ physical_plan 02)--GlobalLimitExec: skip=0, fetch=5 03)----BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 5 PRECEDING AND 1 FOLLOWING [...] 04)------BoundedWindowAggExec: wdw=[sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING: Field { name: "sum(aggregate_test_100.c9) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 1 PRECEDING AND 5 FOLLOWING", data_type: UInt64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, frame: ROWS BETWEEN 1 PRECEDING AND 5 FOL [...] -05)--------SortExec: expr=[c1@0 ASC NULLS LAST, c9@1 DESC], preserve_partitioning=[false] +05)--------SortExec: TopK(fetch=10), expr=[c1@0 ASC NULLS LAST, c9@1 DESC], preserve_partitioning=[false] 06)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c9], file_type=csv, has_header=true query III diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 877a46ef47..5060bc3805 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -129,6 +129,7 @@ The following configuration settings are available: | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. [...] | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores [...] | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible [...] +| datafusion.optimizer.enable_window_limits | true | When set to true, the optimizer will attempt to push limit operations past window functions, if possible [...] | datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any file [...] | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. [...] | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level [...] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org