jcsherin commented on PR #13201: URL: https://github.com/apache/datafusion/pull/13201#issuecomment-2455422680
TL;DR For invalid input expressions, built-in window functions fail early when converting logical plan to physical plan. But user-defined window functions will complete planning, and fail only during physical execution. Validation of input expressions in user-defined window runs only during physical execution. Is it not better for udwf to fail early when converting to physical plan? That would require updating `WindowUDFImpl` so that we parse input expressions when creating the window expressions (similar to built-in window functions). And hook it here so argument parsing happens earlier here: https://github.com/apache/datafusion/blob/b61b2fc7163f8a219de092d54a3712dd16725820/datafusion/physical-plan/src/windows/mod.rs#L158-L164 ### Edge Case: Empty Table ```sql DataFusion CLI v42.2.0 > CREATE TABLE t1(v1 BIGINT); 0 row(s) fetched. Elapsed 0.020 seconds. ``` There are currently no rows in `t1` so an early return happens in `WindowAggStream::compute_aggregates`. https://github.com/apache/datafusion/blob/89e96b404f07900469fee31740f43edd8a410a10/datafusion/physical-plan/src/windows/window_agg_exec.rs#L319-L321 The `nth_value` window function is never executed when this query is run on the empty table `t1`. ```sql > SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1; +-------------------------------------------------------------------------------------------------------------+ | nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | +-------------------------------------------------------------------------------------------------------------+ +-------------------------------------------------------------------------------------------------------------+ 0 row(s) fetched. Elapsed 0.018 seconds. ``` After we insert a few values into `t1` the `nth_value` aggregation will be computed. Only then do we see the expected error message, ```sql > insert into t1 values (123), (456); +-------+ | count | +-------+ | 2 | +-------+ 1 row(s) fetched. Elapsed 0.007 seconds. > SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1; This feature is not implemented: There is only support Literal types for field at idx: 1 in Window Function ``` ### Planning divergence between built-in & user-defined window functions In `main` branch where `nth_value` is a `BuiltInWindowFunction` the argument parsing fails early when mapping logical to physical plan. In explain output there is no physical plan. ```sql > EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1; +--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Projection: NTH_VALUE(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | | | WindowAggr: windowExpr=[[NTH_VALUE(Float64(inf), t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS NTH_VALUE(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] | | | TableScan: t1 projection=[v1] | +--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 1 row(s) fetched. Elapsed 0.009 seconds. ``` But this is not the case for user-defined window functions. In this branch we instead see that a complete plan is built and failure is happening only when the query executes, ```sql > EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1; +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | plan_type | plan | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | logical_plan | Projection: nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING | | | WindowAggr: windowExpr=[[nth_value(Float64(inf), t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] | | | TableScan: t1 projection=[v1] | | physical_plan | ProjectionExec: expr=[nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 as nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] | | | WindowAggExec: wdw=[nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "nth_value(Utf8(\"+Inf\"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }] | | | SortExec: expr=[v1@0 ASC NULLS LAST], preserve_partitioning=[false] | | | MemoryExec: partitions=1, partition_sizes=[0] | | | | +---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ 2 row(s) fetched. Elapsed 0.019 seconds. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org