gstvg commented on PR #18921:
URL: https://github.com/apache/datafusion/pull/18921#issuecomment-3938399669

   Thanks @linhr! It most applies to having lambdas and args partitioned, 
omitting the body on `TreeNode` and removing `Expr::Lambda`. Changing the PR to 
just use a new `Expr::LambdaFunction(Arc<dyn LambdaUDF>)`  where args are 
unpartitioned, body is exposed on `TreeNode` and `Expr::Lambda` is kept is 
okay, it wouldn't incur in the other cited points, but would make the PR bigger.
   
   Thanks @comphead! The current representation looks somewhat similar to your 
suggestion, except that is a `Expr::ScalarFunction`/`ScalarFunctionExpr` 
instead of dedicated `LambdaFunction`, and that references to lambda parameters 
uses `LambdaVariable` instead of `Column`, containing an optional value that 
should be binded before execution:
   
   ```
   FunctionExpression //regular ScalarFunction
     name: list_filter
     children:
       1. ListExpression [1,2,3,4]
       2. LambdaExpression // the added physical LambdaExpr / logical 
Expr::Lambda
            parameters: ["x"]
            body:
               ComparisonExpression (=)
                 left:
                    ArithmeticExpression (%)
                       left: LambdaVariable("x", Field::new("", Int32, false), 
None) that after binding becomes
                             LambdaVariable("x", Field::new("", Int32, false), 
Some([1, 2, 3, 4]))
                       right: Constant(2)
                 right:
                    Constant(0)
   ```
   
   The reason why adding `LambdaVariable` is easier than trying to use `Column` 
is show on the outdated sections on this PR description and on the first 
comment.
   
    If the added code for a `LambdaUDF` is considered worthwhile to not modify 
`ScalarUDF` and/or because it's a better  representation, while keeping others 
things unchanged, I'm okay with it. But since you mentioned vectorized 
execution, I just want to make sure that we are not moving to `LambdaUDF` 
*only* for performance because, contrary to Spark, where the `LambdaVariable` 
value is set and re-set for every row, which would obviously be terrible for a 
vectorized engine, here it is only set once per batch, and so can perform as 
fast as regular expressions. The `LambdaVariable` value being a `ColumnarValue` 
is just a niche optimization for scalar evaluation, we can change it to support 
`ArrayRef` only. Native performance has been a goal since the beginning, and a 
`list_filter` implementation based on this can be easily vectorized. As an 
example, #17220 got vectorized execution while also based on `ScalarUDF` and 
regular physical expressions, [the lambda body is evaluated *once per 
 batch* and it's output used to filter the list values with the `filter` 
kernel, and then the offsets are 
adjusted:](http://github.com/apache/datafusion/pull/17220/changes#diff-3473f25bb419d339904b4f82e9e4485637dcf2b865e29c7f91ce7d646194f780R357-R387)
   
   ```rust
       let filter_array = lambda.evaluate(&batch)?;
       let ColumnarValue::Array(filter_array) = filter_array else {
           return exec_err!("array_filter requires a lambda that returns an 
array of booleans");
       };
   
       let filter_array = as_boolean_array(&filter_array)?;
       let filtered = filter(&values, filter_array)?;
   
       for row_index in 0..list_array.len() {
           if list_array.is_null(row_index) {
               // Handle null arrays by keeping the offset unchanged
               offsets.push_length(0);
               continue;
           }
           let start = value_offsets[row_index];
           let end = value_offsets[row_index + 1];
           let num_true = filter_array
               .slice(start.as_usize(), (end - start).as_usize())
               .true_count();
           offsets.push_length(num_true);
       }
       let offsets = offsets.finish();
       let list_array = GenericListArray::<OffsetSize>::try_new(
           Arc::clone(field),
           offsets,
           filtered,
           nulls.cloned(),
       )?;
   
       Ok(Arc::new(list_array))
   ```
   
   One thing that can be optimized is use 
[Buffer::count_set_bits_offset](https://docs.rs/arrow/latest/arrow/buffer/struct.Buffer.html#method.count_set_bits_offset)
 instead of `BooleanArray::slice` and then `true_count` to avoid 1-2 
`Arc::clone` per loop iteration. There's also a faster way to adjust the 
offsets when the average sub-list length is small, something like 64 or less, 
but it's bigger and a bit more intricate, so I choose implement 
`array_transform` instead to make the PR smaller. A reasonable fast 
`array_fold` is even more complicated, but still implementable in any lambda 
approach discussed here.
   
   `array_transform` here is also vectorized: the transforming lambda body is 
evaluated only once per batch/invoke/evaluate call.
   
   <br>
   
   >  Then we likely need to Bind x to know what is it and finally rewrite it 
into something computable, perhaps using UDF of (Int) => Boolean
   
   I believe that by using `LambdaVariable`, planning the lambda body into a 
physical expr as usual is enough to compute it, without having to use UDFs. One 
thing that we must take into account is column capture, which is already fully 
supported here, including shadowing and projection pushdown(columns from the 
input plan that only appear within a lambda body). As an example, #17220 does 
not support that, and if tried to support it while using `Expr::Column` for 
lambda variables and passing those variables in the `RecordBatch` to 
`PhysicalExpr::evaluate`, would likely get the same problems I got in my first 
approach, which are also show in the outdated sections of the description and 
of [the first comment 
here](https://github.com/apache/datafusion/pull/18921#issuecomment-3573747241). 
That's why `LambdaVariable` exists. Supporting lambdas without capture support 
is relatively easy, regardless of the approach.
   
   @rluvaton  I see you also [were not fond of adding physical expr's to 
`ScalarFunctionArgs` on 
#17220](https://github.com/apache/datafusion/pull/17220#discussion_r2297665061).
 Any comments here?
   
   <br>
   
   Finally, how would the new `LambdaUDF` trait works?
   I believe it should look like `ScalarFunction`?
   
   ```rust
   struct LambdaFunction {
       args: Vec<Expr>,
       fun: Arc<dyn LambdaUDF>,
   }
   
   enum Expr {
       ...
       Expr::LambdaFunction(LambdaFunction),
   }
   ```
   
   It also can be made more like Spark:
   ```rust
   trait LambdaInvoker {
       ...
       fn invoke(&self, args: Vec<Expr>) -> Box<dyn LambdaInvocation>;
   }
   
   enum Expr {
       ...
       Expr::LambdaFunction(Box<dyn LambdaInvocation>),
   }
   ```
   
   I believe `LambdaUDF`/`LambdaInvocation` trait would look like 
`ScalarUDFImpl` except for the `lambdas_parameters` method, `ReturnFieldArgs` 
in `return_field_from_args` and `LambdaFunctionArgs` in the `invoke` method, or 
any of you imagine more fundamental changes?
   
   Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to