gstvg commented on PR #18921:
URL: https://github.com/apache/datafusion/pull/18921#issuecomment-3938399669
Thanks @linhr! It most applies to having lambdas and args partitioned,
omitting the body on `TreeNode` and removing `Expr::Lambda`. Changing the PR to
just use a new `Expr::LambdaFunction(Arc<dyn LambdaUDF>)` where args are
unpartitioned, body is exposed on `TreeNode` and `Expr::Lambda` is kept is
okay, it wouldn't incur in the other cited points, but would make the PR bigger.
Thanks @comphead! The current representation looks somewhat similar to your
suggestion, except that is a `Expr::ScalarFunction`/`ScalarFunctionExpr`
instead of dedicated `LambdaFunction`, and that references to lambda parameters
uses `LambdaVariable` instead of `Column`, containing an optional value that
should be binded before execution:
```
FunctionExpression //regular ScalarFunction
name: list_filter
children:
1. ListExpression [1,2,3,4]
2. LambdaExpression // the added physical LambdaExpr / logical
Expr::Lambda
parameters: ["x"]
body:
ComparisonExpression (=)
left:
ArithmeticExpression (%)
left: LambdaVariable("x", Field::new("", Int32, false),
None) that after binding becomes
LambdaVariable("x", Field::new("", Int32, false),
Some([1, 2, 3, 4]))
right: Constant(2)
right:
Constant(0)
```
The reason why adding `LambdaVariable` is easier than trying to use `Column`
is show on the outdated sections on this PR description and on the first
comment.
If the added code for a `LambdaUDF` is considered worthwhile to not modify
`ScalarUDF` and/or because it's a better representation, while keeping others
things unchanged, I'm okay with it. But since you mentioned vectorized
execution, I just want to make sure that we are not moving to `LambdaUDF`
*only* for performance because, contrary to Spark, where the `LambdaVariable`
value is set and re-set for every row, which would obviously be terrible for a
vectorized engine, here it is only set once per batch, and so can perform as
fast as regular expressions. The `LambdaVariable` value being a `ColumnarValue`
is just a niche optimization for scalar evaluation, we can change it to support
`ArrayRef` only. Native performance has been a goal since the beginning, and a
`list_filter` implementation based on this can be easily vectorized. As an
example, #17220 got vectorized execution while also based on `ScalarUDF` and
regular physical expressions, [the lambda body is evaluated *once per
batch* and it's output used to filter the list values with the `filter`
kernel, and then the offsets are
adjusted:](http://github.com/apache/datafusion/pull/17220/changes#diff-3473f25bb419d339904b4f82e9e4485637dcf2b865e29c7f91ce7d646194f780R357-R387)
```rust
let filter_array = lambda.evaluate(&batch)?;
let ColumnarValue::Array(filter_array) = filter_array else {
return exec_err!("array_filter requires a lambda that returns an
array of booleans");
};
let filter_array = as_boolean_array(&filter_array)?;
let filtered = filter(&values, filter_array)?;
for row_index in 0..list_array.len() {
if list_array.is_null(row_index) {
// Handle null arrays by keeping the offset unchanged
offsets.push_length(0);
continue;
}
let start = value_offsets[row_index];
let end = value_offsets[row_index + 1];
let num_true = filter_array
.slice(start.as_usize(), (end - start).as_usize())
.true_count();
offsets.push_length(num_true);
}
let offsets = offsets.finish();
let list_array = GenericListArray::<OffsetSize>::try_new(
Arc::clone(field),
offsets,
filtered,
nulls.cloned(),
)?;
Ok(Arc::new(list_array))
```
One thing that can be optimized is use
[Buffer::count_set_bits_offset](https://docs.rs/arrow/latest/arrow/buffer/struct.Buffer.html#method.count_set_bits_offset)
instead of `BooleanArray::slice` and then `true_count` to avoid 1-2
`Arc::clone` per loop iteration. There's also a faster way to adjust the
offsets when the average sub-list length is small, something like 64 or less,
but it's bigger and a bit more intricate, so I choose implement
`array_transform` instead to make the PR smaller. A reasonable fast
`array_fold` is even more complicated, but still implementable in any lambda
approach discussed here.
`array_transform` here is also vectorized: the transforming lambda body is
evaluated only once per batch/invoke/evaluate call.
<br>
> Then we likely need to Bind x to know what is it and finally rewrite it
into something computable, perhaps using UDF of (Int) => Boolean
I believe that by using `LambdaVariable`, planning the lambda body into a
physical expr as usual is enough to compute it, without having to use UDFs. One
thing that we must take into account is column capture, which is already fully
supported here, including shadowing and projection pushdown(columns from the
input plan that only appear within a lambda body). As an example, #17220 does
not support that, and if tried to support it while using `Expr::Column` for
lambda variables and passing those variables in the `RecordBatch` to
`PhysicalExpr::evaluate`, would likely get the same problems I got in my first
approach, which are also show in the outdated sections of the description and
of [the first comment
here](https://github.com/apache/datafusion/pull/18921#issuecomment-3573747241).
That's why `LambdaVariable` exists. Supporting lambdas without capture support
is relatively easy, regardless of the approach.
@rluvaton I see you also [were not fond of adding physical expr's to
`ScalarFunctionArgs` on
#17220](https://github.com/apache/datafusion/pull/17220#discussion_r2297665061).
Any comments here?
<br>
Finally, how would the new `LambdaUDF` trait works?
I believe it should look like `ScalarFunction`?
```rust
struct LambdaFunction {
args: Vec<Expr>,
fun: Arc<dyn LambdaUDF>,
}
enum Expr {
...
Expr::LambdaFunction(LambdaFunction),
}
```
It also can be made more like Spark:
```rust
trait LambdaInvoker {
...
fn invoke(&self, args: Vec<Expr>) -> Box<dyn LambdaInvocation>;
}
enum Expr {
...
Expr::LambdaFunction(Box<dyn LambdaInvocation>),
}
```
I believe `LambdaUDF`/`LambdaInvocation` trait would look like
`ScalarUDFImpl` except for the `lambdas_parameters` method, `ReturnFieldArgs`
in `return_field_from_args` and `LambdaFunctionArgs` in the `invoke` method, or
any of you imagine more fundamental changes?
Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]