jcsherin commented on PR #13201:
URL: https://github.com/apache/datafusion/pull/13201#issuecomment-2455422680

   TL;DR
   
   For invalid input expressions, built-in window functions fail early when 
converting logical plan to physical plan. But user-defined window functions 
will complete planning, and fail only during physical execution.
   
   Validation of input expressions in user-defined window runs only during 
physical execution.
   
   Is it not better for udwf to fail early when converting to physical plan? 
   
   That would require updating `WindowUDFImpl` so that we parse input 
expressions when creating the window expressions (similar to built-in window 
functions). And hook it here so argument parsing happens earlier here:
   
https://github.com/apache/datafusion/blob/b61b2fc7163f8a219de092d54a3712dd16725820/datafusion/physical-plan/src/windows/mod.rs#L158-L164
   
   ### Edge Case: Empty Table
   
   ```sql
   DataFusion CLI v42.2.0
   > CREATE TABLE t1(v1 BIGINT);
   0 row(s) fetched.
   Elapsed 0.020 seconds.
   ```
   
   There are currently no rows in `t1` so an early return happens in 
`WindowAggStream::compute_aggregates`.  
   
https://github.com/apache/datafusion/blob/89e96b404f07900469fee31740f43edd8a410a10/datafusion/physical-plan/src/windows/window_agg_exec.rs#L319-L321
   
   The `nth_value` window function is never executed when this query is run on 
the empty table `t1`.
   ```sql
   > SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
   
+-------------------------------------------------------------------------------------------------------------+
   | nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING |
   
+-------------------------------------------------------------------------------------------------------------+
   
+-------------------------------------------------------------------------------------------------------------+
   0 row(s) fetched.
   Elapsed 0.018 seconds.
   ```
   
   After we insert a few values into `t1` the `nth_value` aggregation will be 
computed. Only then do we see the expected error message,
   
   ```sql
   > insert into t1 values (123), (456);
   +-------+
   | count |
   +-------+
   | 2     |
   +-------+
   1 row(s) fetched.
   Elapsed 0.007 seconds.
   
   > SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM t1;
   This feature is not implemented: There is only support Literal types for 
field at idx: 1 in Window Function
   ```
   
   ### Planning divergence between built-in & user-defined window functions
   
   In `main` branch where `nth_value` is a `BuiltInWindowFunction` the argument 
parsing fails early when mapping logical to physical plan. In explain output 
there is no physical plan.
   ```sql
   > EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM 
t1;
   
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type    | plan                                                        
                                                                                
                                                                                
                             |
   
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan | Projection: NTH_VALUE(Utf8("+Inf"),t1.v1) PARTITION BY 
[t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                
                                                                                
                                  |
   |              |   WindowAggr: windowExpr=[[NTH_VALUE(Float64(inf), t1.v1) 
PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING 
AS NTH_VALUE(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING]] |
   |              |     TableScan: t1 projection=[v1]                           
                                                                                
                                                                                
                             |
   
+--------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   1 row(s) fetched.
   Elapsed 0.009 seconds.
   ``` 
   
   But this is not the case for user-defined window functions. In this branch 
we instead see that a complete plan is built and failure is happening only when 
the query executes,
   ```sql
   > EXPLAIN SELECT NTH_VALUE('+Inf'::Double, v1) OVER (PARTITION BY v1) FROM 
t1;
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Projection: nth_value(Utf8("+Inf"),t1.v1) PARTITION BY 
[t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING                
                                                                                
                                                                                
                                                                                
                                                                                
                       |
   |               |   WindowAggr: windowExpr=[[nth_value(Float64(inf), t1.v1) 
PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING 
AS nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING]]                                             
                                                                                
                                                                                
                         |
   |               |     TableScan: t1 projection=[v1]                          
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
   | physical_plan | ProjectionExec: expr=[nth_value(Utf8("+Inf"),t1.v1) 
PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@1 
as nth_value(Utf8("+Inf"),t1.v1) PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING]                                              
                                                                                
                                                                                
                             |
   |               |   WindowAggExec: wdw=[nth_value(Utf8("+Inf"),t1.v1) 
PARTITION BY [t1.v1] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: 
Ok(Field { name: "nth_value(Utf8(\"+Inf\"),t1.v1) PARTITION BY [t1.v1] ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)), is_causal: false }] |
   |               |     SortExec: expr=[v1@0 ASC NULLS LAST], 
preserve_partitioning=[false]                                                   
                                                                                
                                                                                
                                                                                
                                                                                
                                    |
   |               |       MemoryExec: partitions=1, partition_sizes=[0]        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
   |               |                                                            
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
                   |
   
+---------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.019 seconds.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to