mwylde opened a new issue, #13401:
URL: https://github.com/apache/datafusion/issues/13401
### Describe the bug
Recently, window functions were migrated to UDFs (as part of #8709). This
appears to have broken protobuf serialization of some types of window functions
(specifically, those that get planned into BuiltInWindowExpr), producing an
error like
> BuiltIn function not supported: WindowUDFExpr { fun: WindowUDF { inner:
RowNumber { signature: Signature { type_signature: Any(0), volatility:
Immutable } } }, args: [], name: "row_number() PARTITION BY [window] ORDER BY
[count DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
input_types: [], is_reversed: false, ignore_nulls: false }
An example query of this is
```sql
CREATE TABLE test (
a BIGINT,
b BIGINT
);
SELECT
a,
ROW_NUMBER() OVER (PARTITION BY a ORDER BY b DESC) AS row_num
FROM
test;
```
which produces the logical plan
```
...
window_expr: [
WindowFunction(
WindowFunction {
fun: WindowUDF(WindowUDF { inner: RowNumber }),
args: [],
partition_by: [Column(Column { relation:
Some(Bare { table: "test" }), name: "a" })],
order_by: [
Sort { expr: Column(Column { relation:
Some(Bare { table: "test" }), name: "b" }), asc: false, nulls_first: true }
],
window_frame: WindowFrame {
units: Range,
start_bound: Preceding(NULL),
end_bound: CurrentRow
}
}
)
],
...
```
and the physical plan
```
input: BoundedWindowAggExec {
input: SortExec {
input: MemoryExec { ... },
expr: LexOrdering {...},
window_expr: [
BuiltInWindowExpr {
expr: WindowUDFExpr {
fun: WindowUDF { inner: RowNumber },
args: [],
name: "row_number() PARTITION BY [test.a] ORDER BY
[test.b DESC NULLS FIRST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW",
partition_by: [Column { name: "a", index: 0 }],
order_by: LexOrdering {
inner: [
PhysicalSortExpr { expr: Column { name: "b",
index: 1 }, options: SortOptions { descending: true, nulls_first: true } }
]
},
window_frame: WindowFrame {
units: Range,
start_bound: Preceding(Int64(NULL)),
end_bound: CurrentRow
}
}
}
],
schema: Schema { ... },
partition_keys: [Column { name: "a", index: 0 }]
}
```
The error comes from here:
https://github.com/apache/datafusion/blob/ccf6258a1e02eb01af436f78c7f6430be19fa59c/datafusion/proto/src/physical_plan/to_proto.rs#L124
As we can see, there is no logic handling the BuiltInWindowExpr case
### To Reproduce
Here's a test case that demonstrates the issue:
```rust
#[test]
fn roundtrip_built_in_window() -> Result<()> {
let field_a = Field::new("a", DataType::Int64, false);
let field_b = Field::new("b", DataType::Int64, false);
let schema = Arc::new(Schema::new(vec![field_a, field_b]));
let udf = Arc::new(WindowUDF::new_from_impl(RowNumber::new()));
let built_in_window_expr = Arc::new(BuiltInWindowExpr::new(
create_udwf_window_expr(
&udf,
&[],
&*schema,
"row_number() PARTITION BY [a] ORDER BY [b] RANGE BETWEEN
UNBOUNDED PRECEDING AND CURRENT ROW".to_string(),
false,
)?,
&[
col("a", &*schema)?
],
&LexOrdering::new(vec![
PhysicalSortExpr::new(col("b", &*schema)?,
SortOptions::new(true, true)),
]),
Arc::new(WindowFrame::new(None)),
));
let input = Arc::new(EmptyExec::new(schema.clone()));
roundtrip_test(Arc::new(BoundedWindowAggExec::try_new(
vec![built_in_window_expr],
input,
vec![col("a", &schema)?],
InputOrderMode::Sorted)?))
}
```
### Expected behavior
We should be able to serialize window functions to protobuf.
### Additional context
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]