yonatan-sevenai commented on code in PR #21375:
URL: https://github.com/apache/datafusion/pull/21375#discussion_r3255110337
##########
datafusion/sql/src/unparser/plan.rs:
##########
@@ -680,8 +711,219 @@ impl Unparser<'_> {
if self.dialect.unnest_as_lateral_flatten() {
Self::collect_flatten_aliases(p.input.as_ref(), select);
}
- self.reconstruct_select_statement(plan, p, select)?;
- self.select_to_sql_recursively(p.input.as_ref(), query,
select, relation)
+ // Walk down through consecutive Sort/Limit nodes, greedily
+ // absorbing what can be folded into the SELECT we're
+ // building around the Aggregate. A single SQL SELECT can
+ // carry at most one `ORDER BY` (applied before `LIMIT`),
+ // so the safe shape between us and the Aggregate is
+ // `Limit* Sort?` (outer→inner). We stop at the first node
+ // that would violate this; that node becomes the
+ // subquery boundary, and recursion (seeing
+ // `already_projected = true`) wraps it in a derived
+ // relation. If we walk all the way to a non-Sort/non-Limit
+ // terminator, the entire chain folds into one SELECT.
+ //
+ // Stacked Sorts with nothing between them collapse to the
+ // outermost — the same simplification `EnforceSorting`
+ // applies on the physical side — but only when no Limit
+ // has been absorbed since the previous Sort, since the
+ // inner Sort would otherwise be determining which rows
+ // the Limit keeps.
+ //
+ // The fold is collected here without touching `query`
+ // (apart from non-literal direct Limits, which don't
+ // depend on projection form). Once we know whether every
+ // Sort/Limit was absorbed we can pick the right
+ // projection form and emit `ORDER BY` with or without
+ // unprojection.
+ let mut cur = p.input.as_ref();
+ let mut absorbed_sort: Option<&Sort> = None;
+ let mut combined_skip: usize = 0;
+ let mut combined_fetch: Option<usize> = None;
+ let mut have_combined_limit = false;
+ let mut have_direct_limit = false;
+ let mut have_order_by = false;
+ loop {
+ match cur {
+ LogicalPlan::Limit(limit) => {
+ if have_order_by {
+ // Limit-below-Sort: `ORDER BY … LIMIT N`
+ // would apply the sort first, but the
+ // logical plan applies the Limit first.
+ break;
+ }
+ let skip_lit = limit.get_skip_type()?;
+ let fetch_lit = limit.get_fetch_type()?;
+ match (skip_lit, fetch_lit) {
+ (SkipType::Literal(s), FetchType::Literal(f))
=> {
+ if have_direct_limit {
+ break;
+ }
+ if have_combined_limit {
+ // outer = already-accumulated;
+ // inner = this Limit. Same merge
+ // rule as the optimizer.
+ let (cs, cf) = combine_limit(
+ combined_skip,
+ combined_fetch,
+ s,
+ f,
+ );
+ combined_skip = cs;
+ combined_fetch = cf;
+ } else {
+ combined_skip = s;
+ combined_fetch = f;
+ have_combined_limit = true;
+ }
+ }
+ _ => {
+ if have_combined_limit ||
have_direct_limit {
+ // Cannot safely merge a
+ // non-literal Limit with a prior
+ // one; let recursion handle it.
+ break;
+ }
+ let Some(query_ref) = query.as_mut() else {
+ return internal_err!(
+ "Limit operator only valid in a
statement context."
+ );
+ };
+ if let Some(fetch) = &limit.fetch {
+
query_ref.limit(Some(self.expr_to_sql(fetch)?));
+ }
+ if let Some(skip) = &limit.skip {
+ query_ref.offset(Some(ast::Offset {
+ rows: ast::OffsetRows::None,
+ value: self.expr_to_sql(skip)?,
+ }));
+ }
+ have_direct_limit = true;
+ }
+ }
+ cur = limit.input.as_ref();
+ }
+ LogicalPlan::Sort(sort) => {
+ if have_order_by {
+ if sort.fetch.is_some() {
+ // Inner Sort with `fetch` acts as
+ // Sort+Limit; dropping it would lose
+ // the hidden Limit.
+ break;
+ }
+ // Outer Sort already absorbed; the inner
+ // Sort is reordered by it and is
+ // conventionally dropped — matches
+ // `EnforceSorting` on the physical side.
+ cur = sort.input.as_ref();
+ continue;
+ }
+ // First Sort. Capture `fetch` as a combined
+ // Limit (since `Sort { fetch }` is equivalent
+ // to Sort + Limit). Defer emitting `ORDER BY`
+ // until we know whether the chain fully
+ // absorbed.
+ if let Some(fetch) = sort.fetch
+ && !have_direct_limit
Review Comment:
Thanks for catching this.
Decided to simplify the state machine by splitting the arms for `Sort {
fetch }` from the plain `Sort` arm:
- `Sort { fetch }` is logically `Limit(fetch) -> Sort`. The new arm makes
that explicit: it runs the virtual Limit checks first (Limit-below-Sort,
can't-combine-with-direct-limit), and only proceeds to
absorb the Sort if the virtual Limit was absorbable. If either check fails
we stop without touching anything, so the derived subquery boundary lands at
the Sort node and both limits survive.
- `Sort` (no fetch) is the plain case: drop the inner Sort when an outer
one was already absorbed, matching `EnforceSorting`.
For `Projection -> Limit(non-literal) -> Sort { fetch = 5 } -> Aggregate`,
the emitted SQL is now:
```sql
SELECT __agg_0 AS "..." FROM (
SELECT max(...) AS __agg_0 FROM ... GROUP BY ... ORDER BY ... LIMIT 5
) LIMIT CAST(7 AS INTEGER)
```
which preserves both limits. Added a regression test for the exact case
you described.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]