mwylde opened a new issue, #13401:
URL: https://github.com/apache/datafusion/issues/13401

   ### Describe the bug
   
   Recently, window functions were migrated to UDFs (as part of #8709). This 
appears to have broken protobuf serialization of some types of window functions 
(specifically, those that get planned into BuiltInWindowExpr), producing an 
error like
   
   > BuiltIn function not supported: WindowUDFExpr { fun: WindowUDF { inner: 
RowNumber { signature: Signature { type_signature: Any(0), volatility: 
Immutable } } }, args: [], name: "row_number() PARTITION BY [window] ORDER BY 
[count DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", 
input_types: [], is_reversed: false, ignore_nulls: false }
   
   
   An example query of this is
   
   ```sql
   CREATE TABLE test (
       a BIGINT,
       b BIGINT
   );
   
   SELECT
       a,
       ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS row_num
   FROM
       test;
   ```
   
   which produces the logical plan
   
   ```
   ...
                   window_expr: [
                       WindowFunction(
                           WindowFunction {
                               fun: WindowUDF(WindowUDF { inner: RowNumber }),
                               args: [],
                               partition_by: [Column(Column { relation: 
Some(Bare { table: "test" }), name: "a" })],
                               order_by: [
                                   Sort { expr: Column(Column { relation: 
Some(Bare { table: "test" }), name: "b" }), asc: false, nulls_first: true }
                               ],
                               window_frame: WindowFrame {
                                   units: Range,
                                   start_bound: Preceding(NULL),
                                   end_bound: CurrentRow
                               }
                           }
                       )
                   ],
   ...
   ```
   
   and the physical plan
   
   ```
       input: BoundedWindowAggExec {
           input: SortExec {
               input: MemoryExec { ...  },
               expr: LexOrdering {...},
           window_expr: [
               BuiltInWindowExpr {
                   expr: WindowUDFExpr {
                       fun: WindowUDF { inner: RowNumber },
                       args: [],
                       name: "row_number() PARTITION BY [test.a] ORDER BY 
[test.b DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
                       partition_by: [Column { name: "a", index: 0 }],
                       order_by: LexOrdering {
                           inner: [
                               PhysicalSortExpr { expr: Column { name: "b", 
index: 1 }, options: SortOptions { descending: true, nulls_first: true } }
                           ]
                       },
                       window_frame: WindowFrame {
                           units: Range,
                           start_bound: Preceding(Int64(NULL)),
                           end_bound: CurrentRow
                       }
                   }
               }
           ],
           schema: Schema { ...  },
           partition_keys: [Column { name: "a", index: 0 }]
       }
   
   ```
   
   The error comes from here:
   
   
https://github.com/apache/datafusion/blob/ccf6258a1e02eb01af436f78c7f6430be19fa59c/datafusion/proto/src/physical_plan/to_proto.rs#L124
   
   As we can see, there is no logic handling the BuiltInWindowExpr case
   
   
   
   ### To Reproduce
   
   Here's a test case that demonstrates the issue:
   
   ```rust
   #[test]
   fn roundtrip_built_in_window() -> Result<()> {
       let field_a = Field::new("a", DataType::Int64, false);
       let field_b = Field::new("b", DataType::Int64, false);
       let schema = Arc::new(Schema::new(vec![field_a, field_b]));
   
       let udf = Arc::new(WindowUDF::new_from_impl(RowNumber::new()));
   
       let built_in_window_expr = Arc::new(BuiltInWindowExpr::new(
           create_udwf_window_expr(
               &udf,
               &[],
               &*schema,
               "row_number() PARTITION BY [a] ORDER BY [b] RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
               false,
           )?,
           &[
               col("a", &*schema)?
           ],
           &LexOrdering::new(vec![
               PhysicalSortExpr::new(col("b", &*schema)?, 
SortOptions::new(true, true)),
           ]),
           Arc::new(WindowFrame::new(None)),
       ));
   
       let input = Arc::new(EmptyExec::new(schema.clone()));
   
       roundtrip_test(Arc::new(BoundedWindowAggExec::try_new(
           vec![built_in_window_expr],
           input,
           vec![col("a", &schema)?],
       InputOrderMode::Sorted)?))
   }
   ```
   
   ### Expected behavior
   
   We should be able to serialize window functions to protobuf.
   
   ### Additional context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to