This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 02ce571910 Push even local limits past windows (#20752)
02ce571910 is described below
commit 02ce571910e6be643b29f6b282c86ab137dbfd2d
Author: Brent Gardner <[email protected]>
AuthorDate: Fri Mar 6 14:56:40 2026 -0700
Push even local limits past windows (#20752)
## Which issue does this PR close?
- Closes #20751.
## Rationale for this change
Described in issue
## What changes are included in this PR?
A simple change, and a unit test
## Are these changes tested?
With a unit test
## Are there any user-facing changes?
Some queries should go faster, especially when distributed
**Note: AI assistance was used in this PR**
---
Cargo.lock | 1 +
datafusion/physical-optimizer/Cargo.toml | 1 +
.../src/limit_pushdown_past_window.rs | 119 ++++++++++++++++++++-
3 files changed, 120 insertions(+), 1 deletion(-)
diff --git a/Cargo.lock b/Cargo.lock
index 38fa83dd12..9c8f2c5935 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2449,6 +2449,7 @@ dependencies = [
"datafusion-expr",
"datafusion-expr-common",
"datafusion-functions",
+ "datafusion-functions-window",
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"datafusion-physical-plan",
diff --git a/datafusion/physical-optimizer/Cargo.toml
b/datafusion/physical-optimizer/Cargo.toml
index 395da10d62..38c8a7c372 100644
--- a/datafusion/physical-optimizer/Cargo.toml
+++ b/datafusion/physical-optimizer/Cargo.toml
@@ -56,5 +56,6 @@ recursive = { workspace = true, optional = true }
[dev-dependencies]
datafusion-expr = { workspace = true }
datafusion-functions = { workspace = true }
+datafusion-functions-window = { workspace = true }
insta = { workspace = true }
tokio = { workspace = true }
diff --git a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
index c23fa4faef..729b600da7 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown_past_window.rs
@@ -25,7 +25,7 @@ use datafusion_physical_expr::window::{
StandardWindowFunctionExpr, WindowExpr,
};
use datafusion_physical_plan::execution_plan::CardinalityEffect;
-use datafusion_physical_plan::limit::GlobalLimitExec;
+use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::sort::SortExec;
use
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
@@ -206,6 +206,12 @@ fn get_limit(node: &Arc<dyn ExecutionPlan>, ctx: &mut
TraverseState) -> bool {
ctx.reset_limit(limit.fetch().map(|fetch| fetch + limit.skip()));
return true;
}
+ // In distributed execution, GlobalLimitExec becomes LocalLimitExec
+ // per partition. Handle it the same way (LocalLimitExec has no skip).
+ if let Some(limit) = node.as_any().downcast_ref::<LocalLimitExec>() {
+ ctx.reset_limit(Some(limit.fetch()));
+ return true;
+ }
if let Some(limit) =
node.as_any().downcast_ref::<SortPreservingMergeExec>() {
ctx.reset_limit(limit.fetch());
return true;
@@ -254,3 +260,114 @@ fn bound_to_usize(bound: &WindowFrameBound) ->
Option<usize> {
_ => None,
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use datafusion_common::ScalarValue;
+ use datafusion_expr::{WindowFrame, WindowFrameBound, WindowFrameUnits};
+ use datafusion_functions_window::row_number::row_number_udwf;
+ use datafusion_physical_expr::expressions::col;
+ use datafusion_physical_expr::window::StandardWindowExpr;
+ use datafusion_physical_expr_common::sort_expr::{LexOrdering,
PhysicalSortExpr};
+ use datafusion_physical_plan::InputOrderMode;
+ use datafusion_physical_plan::displayable;
+ use datafusion_physical_plan::limit::LocalLimitExec;
+ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
+ use datafusion_physical_plan::windows::{
+ BoundedWindowAggExec, create_udwf_window_expr,
+ };
+ use insta::assert_snapshot;
+ use std::sync::Arc;
+
+ fn plan_str(plan: &dyn ExecutionPlan) -> String {
+ displayable(plan).indent(true).to_string()
+ }
+
+ fn schema() -> Arc<Schema> {
+ Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)]))
+ }
+
+ /// Build: LocalLimitExec or GlobalLimitExec →
BoundedWindowAggExec(row_number) → SortExec
+ fn build_window_plan(
+ use_local_limit: bool,
+ ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+ let s = schema();
+ let input: Arc<dyn ExecutionPlan> =
+ Arc::new(PlaceholderRowExec::new(Arc::clone(&s)));
+
+ let ordering =
+ LexOrdering::new(vec![PhysicalSortExpr::new_default(col("a",
&s)?).asc()])
+ .unwrap();
+
+ let sort: Arc<dyn ExecutionPlan> = Arc::new(
+ SortExec::new(ordering.clone(),
input).with_preserve_partitioning(true),
+ );
+
+ let window_expr = Arc::new(StandardWindowExpr::new(
+ create_udwf_window_expr(
+ &row_number_udwf(),
+ &[],
+ &s,
+ "row_number".to_string(),
+ false,
+ )?,
+ &[],
+ ordering.as_ref(),
+ Arc::new(WindowFrame::new_bounds(
+ WindowFrameUnits::Rows,
+ WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
+ WindowFrameBound::CurrentRow,
+ )),
+ ));
+
+ let window: Arc<dyn ExecutionPlan> =
Arc::new(BoundedWindowAggExec::try_new(
+ vec![window_expr],
+ sort,
+ InputOrderMode::Sorted,
+ true,
+ )?);
+
+ let limit: Arc<dyn ExecutionPlan> = if use_local_limit {
+ Arc::new(LocalLimitExec::new(window, 100))
+ } else {
+ Arc::new(GlobalLimitExec::new(window, 0, Some(100)))
+ };
+
+ Ok(limit)
+ }
+
+ fn optimize(plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
+ let mut config = ConfigOptions::new();
+ config.optimizer.enable_window_limits = true;
+ LimitPushPastWindows::new().optimize(plan, &config).unwrap()
+ }
+
+ /// GlobalLimitExec above a windowed sort should push fetch into the
SortExec.
+ #[test]
+ fn global_limit_pushes_past_window() {
+ let plan = build_window_plan(false).unwrap();
+ let optimized = optimize(plan);
+ assert_snapshot!(plan_str(optimized.as_ref()), @r#"
+ GlobalLimitExec: skip=0, fetch=100
+ BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64
}, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+ SortExec: TopK(fetch=100), expr=[a@0 ASC],
preserve_partitioning=[true]
+ PlaceholderRowExec
+ "#);
+ }
+
+ /// LocalLimitExec above a windowed sort should also push fetch into the
SortExec.
+ /// This is the case in distributed execution where GlobalLimitExec
becomes LocalLimitExec.
+ #[test]
+ fn local_limit_pushes_past_window() {
+ let plan = build_window_plan(true).unwrap();
+ let optimized = optimize(plan);
+ assert_snapshot!(plan_str(optimized.as_ref()), @r#"
+ LocalLimitExec: fetch=100
+ BoundedWindowAggExec: wdw=[row_number: Field { "row_number": UInt64
}, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW], mode=[Sorted]
+ SortExec: TopK(fetch=100), expr=[a@0 ASC],
preserve_partitioning=[true]
+ PlaceholderRowExec
+ "#);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]