songkant-aws opened a new issue, #23007:
URL: https://github.com/apache/datafusion/issues/23007

   # Substrait consumer fails with `DuplicateUnqualifiedField` when a plan 
chains two Window relations and the first window's output column is carried 
into the second window's input
   
   ## Which crate
   
   `datafusion-substrait` (consumer / `from_substrait_plan`). Reproduced on 
`54.0.0` (crates.io) and the code path is unchanged on `branch-54` / `main`.
   
   ## What happens
   
   A logical plan that contains **two `WindowAggr` nodes in series**, where a 
window-derived column produced by the first window survives (via the 
intervening projections) into the **input schema of the second window**, 
round-trips through Substrait and then fails to be consumed:
   
   ```
   SchemaError(DuplicateUnqualifiedField {
       name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW"
   }, Some(""))
   ```
   
   (With `row_number()` windows the duplicated name is
   `row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING` 
instead — same mechanism.)
   
   The producer side is fine: `to_substrait_plan` succeeds. The failure is 
purely on the **consumer** side, when rebuilding the second `Window` relation.
   
   ## Why it happens (root cause)
   
   Substrait intermediate `ProjectRel`/`WindowRel` nodes are **positional** — 
they carry an `output_mapping` of field indices, not names. Field names live 
only at `ReadRel.base_schema` and at the top-level 
`Plan.relations[].root.names`. So any alias a producer attached to an 
intermediate window column (`AS seq1`, `AS ravg1`, …) is **dropped** in the 
Substrait IR. When the consumer rebuilds the expressions, a window function 
gets its default schema name back, e.g. `avg(data.b) ... RANGE ...` or 
`row_number() ROWS BETWEEN ...`.
   
   In the consumer, `from_project_rel`
   (`src/logical_plan/consumer/rel/project_rel.rs`) handles a project that
   contains window expressions like this:
   
   ```rust
   // project_rel.rs (54.0.0), lines ~49-82
   let mut explicit_exprs: Vec<Expr> = vec![];
   let mut window_exprs: HashSet<Expr> = HashSet::new();
   for expr in &p.expressions {
       let e = consumer.consume_expression(expr, input.clone().schema()).await?;
       if let Expr::WindowFunction(_) = &e {
           window_exprs.insert(e.clone());
       }
       explicit_exprs.push(name_tracker.get_uniquely_named_expr(e)?);   // (A)
   }
   
   let input = if !window_exprs.is_empty() {
       LogicalPlanBuilder::window_plan(input, window_exprs)?            // (B)  
<-- error originates here
   } else {
       input
   };
   
   let mut final_exprs: Vec<Expr> = vec![];
   for index in 0..original_schema.fields().len() {
       let e = 
Expr::Column(Column::from(original_schema.qualified_field(index)));
       final_exprs.push(name_tracker.get_uniquely_named_expr(e)?);      // (C)
   }
   final_exprs.append(&mut explicit_exprs);
   project(input, final_exprs)                                         // (D)
   ```
   
   The `NameTracker` dedup at **(A)** and **(C)** only governs the names of the
   **outer projection** built at **(D)**. The collision, however, happens 
earlier,
   at **(B)**, inside `LogicalPlanBuilder::window_plan` → `.window(...)` →
   `Window::try_new` (`datafusion-expr` `plan.rs`), which builds
   `window_fields = [ ...all input fields... ] ++ [ ...new window expr 
fields... ]`
   and passes it to `DFSchema::new_with_metadata`, whose `check_names()` raises
   `DuplicateUnqualifiedField`:
   
   ```
   [ ...all input fields... ] ++ [ ...new window expr fields... ]
   ```
   
   When the first window's output column has been carried down into `input`
   (here named `avg(data.b) ... RANGE ...`), and the second window 
re-introduces an
   expression with the **same default schema name** (because its alias was lost 
over
   Substrait), the resulting `DFSchema` has two fields with identical 
unqualified
   names → `DuplicateUnqualifiedField`.
   
   `NameTracker` never inspects the schema *inside* `window_plan`, so it cannot
   prevent this. It also does not seed itself with the input schema's existing 
field
   names, so even at the project level an inherited window column and a 
freshly-built
   identical one are not deduplicated against each other.
   
   ### Relationship to #15211
   
   PR #15211 added `NameTracker` to dedup **duplicate window functions 
referenced
   multiple times within a single project**. That fix is real but orthogonal: it
   covers duplicates *within one `from_project_rel` call's explicit 
expressions*. It
   does **not** cover the case here — an inherited window column (from a 
*previous*
   window relation, already in the input schema) colliding with a newly-built 
window
   expression of the same default name. The duplicate is across the 
input-schema /
   new-window boundary inside `window_plan`, which `NameTracker` doesn't reach.
   
   ## Minimal reproductions
   
   All four are `datafusion-substrait` integration tests
   (`datafusion/substrait/tests/cases/roundtrip_logical_plan.rs`) using the 
existing
   `create_context()` / `roundtrip_logical_plan_with_ctx()` helpers.
   
   ### Repro 1 — two identical `row_number()` windows (fails)
   
   ```rust
   #[tokio::test]
   async fn chained_identical_window_functions() -> Result<()> {
       use datafusion::functions_window::expr_fn::row_number;
       let ctx = create_context().await?;
       let scan = ctx.table("data").await?.into_optimized_plan()?;
       let plan = LogicalPlanBuilder::from(scan)
           .window(vec![row_number().alias("rn1")])?
           .window(vec![row_number().alias("rn2")])?
           .build()?;
       roundtrip_logical_plan_with_ctx(plan, ctx).await?;   // FAILS: 
DuplicateUnqualifiedField
       Ok(())
   }
   ```
   
   ### Repro 2 — seq window + sort + project-except, chained (PASSES — 
important contrast)
   
   ```rust
   #[tokio::test]
   async fn chained_seq_with_sort_and_project_except() -> Result<()> {
       use datafusion::functions_window::expr_fn::row_number;
       let ctx = create_context().await?;
       let scan = ctx.table("data").await?.into_optimized_plan()?;
       let plan = LogicalPlanBuilder::from(scan)
           .window(vec![row_number().alias("seq1")])?
           .project(vec![col("a"), col("b"), col("seq1")])?
           .sort(vec![col("seq1").sort(true, false)])?
           .project(vec![col("a"), col("b")])?              // DROP seq1
           .window(vec![row_number().alias("seq2")])?
           .project(vec![col("a"), col("b"), col("seq2")])?
           .sort(vec![col("seq2").sort(true, false)])?
           .project(vec![col("a"), col("b")])?              // DROP seq2
           .build()?;
       roundtrip_logical_plan_with_ctx(plan, ctx).await?;   // PASSES
       Ok(())
   }
   ```
   
   This passes because dropping `seq1` lets the plan flatten so the two
   `row_number()` columns never coexist in a single Window input schema. This 
case
   isolates the trigger: it is **not** `sort`/`project-except` that breaks 
things.
   
   ### Repro 3 — faithful shape: window emits seq + running aggregate, the 
aggregate is carried through (fails)
   
   This mirrors a real chained `streamstats` plan: each window emits a sequence
   column **and** a running aggregate; the running aggregate is carried through 
the
   intervening projects (only the seq is dropped). Carrying that window column 
is
   what keeps window #1's output alive in window #2's input.
   
   ```rust
   #[tokio::test]
   async fn chained_window_with_carried_running_aggregate() -> Result<()> {
       use datafusion::functions_aggregate::average::avg_udaf;
       use datafusion::functions_window::expr_fn::row_number;
       use datafusion::logical_expr::expr::WindowFunction;
       use datafusion::logical_expr::{Expr, ExprFunctionExt, WindowFrame};
   
       let ctx = create_context().await?;
       let scan = ctx.table("data").await?.into_optimized_plan()?;
   
       let running1 = Expr::from(WindowFunction::new(avg_udaf(), 
vec![col("b")]))
           .window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg1");
       let running2 = Expr::from(WindowFunction::new(avg_udaf(), 
vec![col("b")]))
           .window_frame(WindowFrame::new(Some(false))).build()?.alias("ravg2");
   
       let plan = LogicalPlanBuilder::from(scan)
           .window(vec![row_number().alias("seq1"), running1])?
           .project(vec![col("a"), col("b"), col("seq1"), col("ravg1")])?
           .sort(vec![col("seq1").sort(true, false)])?
           .project(vec![col("a"), col("b"), col("ravg1")])?            // drop 
seq1, KEEP ravg1
           .window(vec![row_number().alias("seq2"), running2])?
           .project(vec![col("a"), col("b"), col("ravg1"), col("seq2"), 
col("ravg2")])?
           .sort(vec![col("seq2").sort(true, false)])?
           .project(vec![col("a"), col("b"), col("ravg1"), col("ravg2")])?  // 
drop seq2
           .build()?;
   
       roundtrip_logical_plan_with_ctx(plan, ctx).await?;
       // FAILS: DuplicateUnqualifiedField {
       //   name: "avg(data.b) ORDER BY [UInt64(1) ASC NULLS LAST] RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" }
       Ok(())
   }
   ```
   
   ## Suggested fix (direction, not a final patch)
   
   Extend the existing `NameTracker` mechanism so it also dedups against names 
that
   already exist in the window relation's input schema. Concretely, in
   `from_project_rel`, before building `window_exprs`, seed the tracker (or 
apply
   `get_uniquely_named_expr`) with the input schema's field names, and pass 
uniquely
   re-aliased window expressions into `window_plan`, so an inherited window 
column
   forces the newly-built one to receive a `__temp__N` suffix — the same 
disambiguation
   #15211 already uses, just extended across the input-schema boundary. This 
keeps the
   positional `output_mapping` semantics intact (the suffix only affects 
internal
   schema names, not the emitted ordering).
   
   I'm happy to open a PR along these lines and add the four tests above (the 
passing
   one included, as a guard against "fixing" it by breaking the flatten path).
   
   ## Environment
   
   - `datafusion` / `datafusion-substrait` `54.0.0` (also present on 
`branch-54`, HEAD `08da279`, and `main`).
   - Discovered via OpenSearch's PPL `streamstats` command, which lowers chained
     `streamstats` into exactly this `window → project(carry) → sort → 
project(drop seq)`
     shape and ships the plan to a DataFusion backend over Substrait.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to