gstvg commented on PR #18921:
URL: https://github.com/apache/datafusion/pull/18921#issuecomment-3950771759

   @rluvaton @comphead 
   
   I updated the PR to a `LambdaUDF` trait based implementation. It added [1300 
lines](https://github.com/apache/datafusion/pull/18921/commits/e1921eb377a56766b2dd4729311def1c3515c8c4),
 totaling 3000, mostly boilerplate from `ScalarUDF` including a lot of 
documentation. Was that on the range you were expecting? 
   [Based on the ScalarUDF docs stating that it exists to maintain backwards 
compatibility with an older 
API](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html#api-note),
 I included only a `LambdaUDF` trait and not a `struct LambdaUDF` + `trait 
LambdaUDFImpl` pair to not make even bigger. There's a few small things missing 
that I want to implement tomorrow, and if all of you are okay with the results, 
I want to open this to review. My only concern is that the PR size may delay 
the review, as in 
https://github.com/apache/datafusion/pull/17220#issuecomment-3223911837, but 
since IMHO this PR is simpler, I hope it will be that long to review, despite 
the size
   
   > 1. The ScalarUDF trait will not grow too much and make implementing 
regular scalar UDFs easier or lambda overwhelming
   > 2. what if we need to add a required function but only for lambda, we can 
add it on the new trait with ease and we won't need to do some weird stuff to 
avoid breaking changes.
   > 3. Less ambiguity on the API.
   
   Yeah, I think that 2 is the main point. The previous ScalarUDF based 
approach only added a single method to the trait that already contains 20, it 
didn't require any change for non-lambda udfs (this would be unacceptable), and 
compared to the actual LambdaUDF based, lambda UDFs only required 2 additional 
lines of code per implementation, but, while I can't think of any ... all my 
previous counter-arguments can fall apart in the future with a single 
requirement change. There's upfront cost in review time and time to merge, but 
also more room to work in the future
   
   > Support Map, (Large)List, (Large)ListView, FixedSizeList as the input for 
lambda
   
   Currently any type is accepted and passed to to the implementation to derive 
the parameters from it, usually the inner values of a list, but I want to work 
with unions too, for example. But as of now array_transform doesn't handle 
ListView's (I thought there's no plans to support it overall?). Since the 
ListView values may contain unreferenced values, should it be compacted, or 
casted to a regular compacted List? And if so, return the transformed List or 
cast it to ListView? 
   
   > multiple lambdas in a single expression, for example 
map_key_value(some_map_col, map_key_lambda, map_value_lambda) and each lambda 
gets a different variables
   
   [Supported: LambdaFunctionArgs.args can hold multiple 
lambdas](https://github.com/apache/datafusion/pull/18921/changes#diff-e464d894d5a0fe37392c40f9e54f67590220578259a8dc41fa57b329f6f0d343R143-R149)
   
   ```rust
       fn invoke_with_args(&self, args: LambdaFunctionArgs) -> 
Result<ColumnarValue> {
           let [list_value, lambda] = take_function_args(self.name(), 
&args.args)?;
   
           let (ValueOrLambda::Value(list_value), 
ValueOrLambda::Lambda(lambda)) =
               (list_value, lambda)
           else {
               return exec_err!(...
   ```
   
   Since the beginning I worked to support multiple lambdas to implement union 
manipulating functions like this:
   ```sql
   union_transform(
       union_value, 
       'str', str -> trim(str),
       'num', num -> num*2,
       'bool', bool -> NOT bool,
   )
   ```
   
   > Lambda expression that access columns that are not in the list itself
   
   [Already supported and tested (I call it column capture through the PR and 
comments. Please ignore the comment above the test, I'm going to remove it)
   
](https://github.com/apache/datafusion/pull/18921/changes#diff-53634af9d48017f60cb545dfe2aaab20d1f7fa3435947c161ef22515fa62ed8dR55-R58)
   
   ```sql
   CREATE TABLE t as SELECT 1 as n;
   query ?
   SELECT array_transform([1, 2], (e) -> n) from t;
   ----
   [1, 1]
   ```
   
   > optional arguments for lambda, for example the index of the item in the 
list
   the optional here is important as I want to avoid creating that input if I 
don't need to.
   
   [Also supported 
](https://github.com/apache/datafusion/pull/18921/changes#diff-e464d894d5a0fe37392c40f9e54f67590220578259a8dc41fa57b329f6f0d343R168-R177)
   ```rust
           // use closures so that bind_lambda_variables evaluates only the 
params that are actually referenced
           // avoiding unnecessary computations
           let values_param = || Ok(Arc::clone(list_values));
           let indices_param = || elements_indices(&list_array);
   
           let binded_body = bind_lambda_variables(
               Arc::clone(&lambda.body),
               &lambda.params,
               &[&values_param, &indices_param],
           )?;
   ```
   
   *after a refactor, bind_lambda_params actually eager evaluated all params, 
but I'll fix that
   
   > Nested lambda expressions
   
   
[Supported](http://github.com/apache/datafusion/pull/18921/changes#diff-53634af9d48017f60cb545dfe2aaab20d1f7fa3435947c161ef22515fa62ed8dR45-R47)
   
   ```sql
   SELECT array_transform(t.v, (v1, i) -> array_transform(v1, (v2, j) -> 
array_transform(v2, v3 -> j)) ) from t;
   ----
   [[[0, 1], [0]], [[0]], [[]]]
   ```
   
   > array_transform, the tricky part for this function is its return type 
depends on lambda
   
   `LambdaUDF` contains a method `lambdas_arguments` where the implementation 
must return the type of the parameters of all it's lambdas when evaluated with 
a given set of values. Then DF uses this info to compute the type of the 
lambdas and pass it on `return_field_from_args`, so the implementation can 
easily compute it's return type. For example the expr `array_transform([1, 2], 
v -> repeat(v, 'a'))`, `lambdas_arguments` would receive 
`[ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda]` and should return 
`vec![None, Some(vec![DataType::Int32])]`. Then `return_field_from_args` would 
be called with `[ValueOrLambda::Value(List(Int32)), 
ValueOrLambda::Lambda(Utf8)]`, where the implementation would need just to 
return `List(Utf8)`.
   
   <details><summary>return_field_from_args method</summary>
   
   ```rust
   pub struct LambdaReturnFieldArgs<'a> {
       /// The data types of the arguments to the function
       ///
       /// If argument `i` to the function is a lambda, it will be the field 
returned by the
       /// lambda when executed with the arguments returned from 
`LambdaUDF::lambdas_parameters`
       ///
       /// For example, with `array_transform([1], v -> v == 5)`
       /// this field will be `[Field::new("", DataType::List(DataType::Int32), 
false), Field::new("", DataType::Boolean, false)]`
       pub arg_fields: &'a [ValueOrLambdaField],
       /// Is argument `i` to the function a scalar (constant)?
       ///
       /// If the argument `i` is not a scalar, it will be None
       ///
       /// For example, if a function is called like `my_function(column_a, 5)`
       /// this field will be `[None, Some(ScalarValue::Int32(Some(5)))]`
       pub scalar_arguments: &'a [Option<&'a ScalarValue>],
   }
   
   /// A tagged Field indicating whether it correspond to a value or a lambda 
argument
   #[derive(Clone, Debug)]
   pub enum ValueOrLambdaField {
       /// The Field of a ColumnarValue argument
       Value(FieldRef),
       /// The Field of the return of the lambda body when evaluated with the 
parameters from LambdaUDF::lambda_parameters
       Lambda(FieldRef),
   }
   
       fn return_field_from_args(
           &self,
           args: datafusion_expr::LambdaReturnFieldArgs,
       ) -> Result<Arc<Field>> {
           let [ValueOrLambdaField::Value(list), 
ValueOrLambdaField::Lambda(lambda)] =
               take_function_args(self.name(), args.arg_fields)?
           else {
               return exec_err!(
                   "{} expects a value follewed by a lambda, got {:?}",
                   self.name(),
                   args
               );
           };
   
           //TODO: should metadata be copied into the transformed array?
   
           // lambda is the resulting field of executing the lambda body
           // with the parameters returned in lambdas_parameters
           let field = Arc::new(Field::new(
               Field::LIST_FIELD_DEFAULT_NAME,
               lambda.data_type().clone(),
               lambda.is_nullable(),
           ));
   
           let return_type = match list.data_type() {
               DataType::List(_) => DataType::List(field),
               DataType::LargeList(_) => DataType::LargeList(field),
               DataType::FixedSizeList(_, size) => 
DataType::FixedSizeList(field, *size),
               _ => unreachable!(),
           };
   
           Ok(Arc::new(Field::new("", return_type, list.is_nullable())))
       }
   ```
   
   </details>
   
   
   <details><summary>lambdas_parameters method</summary>
   
   ```rust
   trait LambdaUDF {
       /// Returns the parameters that any lambda supports
       fn lambdas_parameters(
           &self,
           args: &[ValueOrLambdaParameter],
       ) -> Result<Vec<Option<Vec<Field>>>>;
   }
   
   pub enum ValueOrLambdaParameter<'a> {
       /// A columnar value with the given field
       Value(FieldRef),
       /// A lambda
       Lambda,
   }
   
   // array_transform implementation
   
   impl LambdaUDF for ArrayTransform {
       fn lambdas_parameters(
           &self,
           args: &[ValueOrLambdaParameter],
       ) -> Result<Vec<Option<Vec<Field>>>> {
           let [ValueOrLambdaParameter::Value(list), 
ValueOrLambdaParameter::Lambda] =
               args
           else {
               return exec_err!(
                   "{} expects a value followed by a lambda, got {:?}",
                   self.name(),
                   args
               );
           };
   
           let (field, index_type) = match list.data_type() {
               DataType::List(field) => (field, DataType::Int32),
               DataType::LargeList(field) => (field, DataType::Int64),
               DataType::FixedSizeList(field, _) => (field, DataType::Int32),
               _ => return exec_err!("expected list, got {list}"),
           };
   
           // we don't need to omit the index in the case the lambda don't 
specify, e.g. array_transform([], v -> v*2),
           // nor check whether the lambda contains more than two parameters, 
e.g. array_transform([], (v, i, j) -> v+i+j),
           // as datafusion will do that for us
           let value = Field::new("value", field.data_type().clone(), 
field.is_nullable())
               .with_metadata(field.metadata().clone());
           let index = Field::new("index", index_type, false);
   
           Ok(vec![None, Some(vec![value, index])])
       }
   }
   
   ```
   
   </details>
   
   > pub struct LambdaExpr {
       /// Parameter names/types already resolved
       pub param_types: Vec<DataType>,
       /// Expression body, what needs to be evaluated, this thing potentially 
can be UDF
       pub body: Arc<dyn PhysicalExpr>,
   }
   
   The current implementation is functional only with the parameters names 
without requiring their datatypes, and IHMO it's easier to the users to be like 
that, both expr_api and sql. The expr api could look like this:
   
   ```rust
       array_transform(
           col("my_array"),
           lambda(
               ["current_value"], // define the parameters name 
               lambda_variable("current_value") * lit(2) // the lambda body
           )
       )
   ```
   
   How to parse the clickhouse syntax I believe it's a question for the future: 
either check that the types match with the ones returned by 
`LambdaUDF::lambdas_parameters`, or cast the parameters to the specified type
   
   > 1. replace null lists that is not empty underneath
   > 3. fixed size list with a null list
   > I'm conflicted on whether we should fix case 1 for them as it is costly 
and the user might have prior knowledge to avoid that.
   
   Saw your review in #17220 but thought it mattered only for performance and 
didn't thought about fallible expressions..
   Yeah.. maybe we can add a helper udf that clean the list on user demand, to 
be called before calling the lambda udf?
   
   For fixed size lists, I think we can replace null sublists with the first 
non-null sublist of the array, if the non-null also fails then it's a user 
problem
   
   > 2 .sliced list and not computing values outside the sliced data 
   
   Also saw on #17220 about this but forgot it, thanks for reminding me, will 
fix soon
   
   > In every case here, we should answer:
   > 
   > how would the user handle that?
   > how could we make it easier for the user?
   > How would we help them avoid forgetting handling?
   
   I think the best we can do is to document it, add examples and helpers 
functions and add a lot of comments in one or few LambdaUDFs as reference 
implementations. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to