songkant-aws opened a new issue, #23007:
URL: https://github.com/apache/datafusion/issues/23007
# Substrait consumer fails with `DuplicateUnqualifiedField` when a plan
chains two Window relations and the first window's output column is carried
into the second window's input
## Which crate
`datafusion-substrait` (consumer / `from_substrait_plan`). Reproduced on
`54.0.0` (crates.io) and the code path is unchanged on `branch-54` / `main`.
## What happens
A logical plan that contains **two `WindowAggr` nodes in series**, where a
window-derived column produced by the first window survives (via the
intervening projections) into the **input schema of the second window**,
round-trips through Substrait and then fails to be consumed:
```
SchemaError(DuplicateUnqualifiedField {
name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW"
}, Some(""))
```
(With `row_number()` windows the duplicated name is
`row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING`
instead — same mechanism.)
The producer side is fine: `to_substrait_plan` succeeds. The failure is
purely on the **consumer** side, when rebuilding the second `Window` relation.
## Why it happens (root cause)
Substrait intermediate `ProjectRel`/`WindowRel` nodes are **positional** —
they carry an `output_mapping` of field indices, not names. Field names live
only at `ReadRel.base_schema` and at the top-level
`Plan.relations[].root.names`. So any alias a producer attached to an
intermediate window column (`AS seq1`, `AS ravg1`, …) is **dropped** in the
Substrait IR. When the consumer rebuilds the expressions, a window function
gets its default schema name back, e.g. `avg(data.b) ... RANGE ...` or
`row_number() ROWS BETWEEN ...`.
In the consumer, `from_project_rel`
(`src/logical_plan/consumer/rel/project_rel.rs`) handles a project that
contains window expressions like this:
```rust
// project_rel.rs (54.0.0), lines ~49-82
let mut explicit_exprs: Vec<Expr> = vec![];
let mut window_exprs: HashSet<Expr> = HashSet::new();
for expr in &p.expressions {
let e = consumer.consume_expression(expr, input.clone().schema()).await?;
if let Expr::WindowFunction(_) = &e {
window_exprs.insert(e.clone());
}
explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?); // (A)
}
let input = if !window_exprs.is_empty() {
LogicalPlanBuilder::window_plan(input, window_exprs)? // (B)
<-- error originates here
} else {
input
};
let mut final_exprs: Vec<Expr> = vec![];
for index in 0..original_schema.fields().len() {
let e =
Expr::Column(Column::from(original_schema.qualified_field(index)));
final_exprs.push(name_tracker.get_uniquely_named_expr(e)?); // (C)
}
final_exprs.append(&mut explicit_exprs);
project(input, final_exprs) // (D)
```
The `NameTracker` dedup at **(A)** and **(C)** only governs the names of the
**outer projection** built at **(D)**. The collision, however, happens
earlier,
at **(B)**, inside `LogicalPlanBuilder::window_plan` → `.window(...)` →
`Window::try_new` (`datafusion-expr` `plan.rs`), which builds
`window_fields = [ ...all input fields... ] ++ [ ...new window expr
fields... ]`
and passes it to `DFSchema::new_with_metadata`, whose `check_names()` raises
`DuplicateUnqualifiedField`:
```
[ ...all input fields... ] ++ [ ...new window expr fields... ]
```
When the first window's output column has been carried down into `input`
(here named `avg(data.b) ... RANGE ...`), and the second window
re-introduces an
expression with the **same default schema name** (because its alias was lost
over
Substrait), the resulting `DFSchema` has two fields with identical
unqualified
names → `DuplicateUnqualifiedField`.
`NameTracker` never inspects the schema *inside* `window_plan`, so it cannot
prevent this. It also does not seed itself with the input schema's existing
field
names, so even at the project level an inherited window column and a
freshly-built
identical one are not deduplicated against each other.
### Relationship to #15211
PR #15211 added `NameTracker` to dedup **duplicate window functions
referenced
multiple times within a single project**. That fix is real but orthogonal: it
covers duplicates *within one `from_project_rel` call's explicit
expressions*. It
does **not** cover the case here — an inherited window column (from a
*previous*
window relation, already in the input schema) colliding with a newly-built
window
expression of the same default name. The duplicate is across the
input-schema /
new-window boundary inside `window_plan`, which `NameTracker` doesn't reach.
## Minimal reproductions
All four are `datafusion-substrait` integration tests
(`datafusion/substrait/tests/cases/roundtrip_logical_plan.rs`) using the
existing
`create_context()` / `roundtrip_logical_plan_with_ctx()` helpers.
### Repro 1 — two identical `row_number()` windows (fails)
```rust
#[tokio::test]
async fn chained_identical_window_functions() -> Result<()> {
use datafusion::functions_window::expr_fn::row_number;
let ctx = create_context().await?;
let scan = ctx.table("data").await?.into_optimized_plan()?;
let plan = LogicalPlanBuilder::from(scan)
.window(vec![row_number().alias("rn1")])?
.window(vec![row_number().alias("rn2")])?
.build()?;
roundtrip_logical_plan_with_ctx(plan, ctx).await?; // FAILS:
DuplicateUnqualifiedField
Ok(())
}
```
### Repro 2 — seq window + sort + project-except, chained (PASSES —
important contrast)
```rust
#[tokio::test]
async fn chained_seq_with_sort_and_project_except() -> Result<()> {
use datafusion::functions_window::expr_fn::row_number;
let ctx = create_context().await?;
let scan = ctx.table("data").await?.into_optimized_plan()?;
let plan = LogicalPlanBuilder::from(scan)
.window(vec![row_number().alias("seq1")])?
.project(vec![col("a"), col("b"), col("seq1")])?
.sort(vec![col("seq1").sort(true, false)])?
.project(vec![col("a"), col("b")])? // DROP seq1
.window(vec![row_number().alias("seq2")])?
.project(vec![col("a"), col("b"), col("seq2")])?
.sort(vec![col("seq2").sort(true, false)])?
.project(vec![col("a"), col("b")])? // DROP seq2
.build()?;
roundtrip_logical_plan_with_ctx(plan, ctx).await?; // PASSES
Ok(())
}
```
This passes because dropping `seq1` lets the plan flatten so the two
`row_number()` columns never coexist in a single Window input schema. This
case
isolates the trigger: it is **not** `sort`/`project-except` that breaks
things.
### Repro 3 — faithful shape: window emits seq + running aggregate, the
aggregate is carried through (fails)
This mirrors a real chained `streamstats` plan: each window emits a sequence
column **and** a running aggregate; the running aggregate is carried through
the
intervening projects (only the seq is dropped). Carrying that window column
is
what keeps window #1's output alive in window #2's input.
```rust
#[tokio::test]
async fn chained_window_with_carried_running_aggregate() -> Result<()> {
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::functions_window::expr_fn::row_number;
use datafusion::logical_expr::expr::WindowFunction;
use datafusion::logical_expr::{Expr, ExprFunctionExt, WindowFrame};
let ctx = create_context().await?;
let scan = ctx.table("data").await?.into_optimized_plan()?;
let running1 = Expr::from(WindowFunction::new(avg_udaf(),
vec![col("b")]))
.window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg1");
let running2 = Expr::from(WindowFunction::new(avg_udaf(),
vec![col("b")]))
.window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg2");
let plan = LogicalPlanBuilder::from(scan)
.window(vec![row_number().alias("seq1"), running1])?
.project(vec![col("a"), col("b"), col("seq1"), col("ravg1")])?
.sort(vec![col("seq1").sort(true, false)])?
.project(vec![col("a"), col("b"), col("ravg1")])? // drop
seq1, KEEP ravg1
.window(vec![row_number().alias("seq2"), running2])?
.project(vec![col("a"), col("b"), col("ravg1"), col("seq2"),
col("ravg2")])?
.sort(vec![col("seq2").sort(true, false)])?
.project(vec![col("a"), col("b"), col("ravg1"), col("ravg2")])? //
drop seq2
.build()?;
roundtrip_logical_plan_with_ctx(plan, ctx).await?;
// FAILS: DuplicateUnqualifiedField {
// name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" }
Ok(())
}
```
## Suggested fix (direction, not a final patch)
Extend the existing `NameTracker` mechanism so it also dedups against names
that
already exist in the window relation's input schema. Concretely, in
`from_project_rel`, before building `window_exprs`, seed the tracker (or
apply
`get_uniquely_named_expr`) with the input schema's field names, and pass
uniquely
re-aliased window expressions into `window_plan`, so an inherited window
column
forces the newly-built one to receive a `__temp__N` suffix — the same
disambiguation
#15211 already uses, just extended across the input-schema boundary. This
keeps the
positional `output_mapping` semantics intact (the suffix only affects
internal
schema names, not the emitted ordering).
I'm happy to open a PR along these lines and add the four tests above (the
passing
one included, as a guard against "fixing" it by breaking the flatten path).
## Environment
- `datafusion` / `datafusion-substrait` `54.0.0` (also present on
`branch-54`, HEAD `08da279`, and `main`).
- Discovered via OpenSearch's PPL `streamstats` command, which lowers chained
`streamstats` into exactly this `window → project(carry) → sort →
project(drop seq)`
shape and ships the plan to a DataFusion backend over Substrait.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]