gstvg commented on PR #18921: URL: https://github.com/apache/datafusion/pull/18921#issuecomment-3950771759
@rluvaton @comphead I updated the PR to a `LambdaUDF` trait based implementation. It added [1300 lines](https://github.com/apache/datafusion/pull/18921/commits/e1921eb377a56766b2dd4729311def1c3515c8c4), totaling 3000, mostly boilerplate from `ScalarUDF` including a lot of documentation. Was that on the range you were expecting? [Based on the ScalarUDF docs stating that it exists to maintain backwards compatibility with an older API](https://docs.rs/datafusion/latest/datafusion/logical_expr/struct.ScalarUDF.html#api-note), I included only a `LambdaUDF` trait and not a `struct LambdaUDF` + `trait LambdaUDFImpl` pair to not make even bigger. There's a few small things missing that I want to implement tomorrow, and if all of you are okay with the results, I want to open this to review. My only concern is that the PR size may delay the review, as in https://github.com/apache/datafusion/pull/17220#issuecomment-3223911837, but since IMHO this PR is simpler, I hope it will be that long to review, despite the size > 1. The ScalarUDF trait will not grow too much and make implementing regular scalar UDFs easier or lambda overwhelming > 2. what if we need to add a required function but only for lambda, we can add it on the new trait with ease and we won't need to do some weird stuff to avoid breaking changes. > 3. Less ambiguity on the API. Yeah, I think that 2 is the main point. The previous ScalarUDF based approach only added a single method to the trait that already contains 20, it didn't require any change for non-lambda udfs (this would be unacceptable), and compared to the actual LambdaUDF based, lambda UDFs only required 2 additional lines of code per implementation, but, while I can't think of any ... all my previous counter-arguments can fall apart in the future with a single requirement change. There's upfront cost in review time and time to merge, but also more room to work in the future > Support Map, (Large)List, (Large)ListView, FixedSizeList as the input for lambda Currently any type is accepted and passed to to the implementation to derive the parameters from it, usually the inner values of a list, but I want to work with unions too, for example. But as of now array_transform doesn't handle ListView's (I thought there's no plans to support it overall?). Since the ListView values may contain unreferenced values, should it be compacted, or casted to a regular compacted List? And if so, return the transformed List or cast it to ListView? > multiple lambdas in a single expression, for example map_key_value(some_map_col, map_key_lambda, map_value_lambda) and each lambda gets a different variables [Supported: LambdaFunctionArgs.args can hold multiple lambdas](https://github.com/apache/datafusion/pull/18921/changes#diff-e464d894d5a0fe37392c40f9e54f67590220578259a8dc41fa57b329f6f0d343R143-R149) ```rust fn invoke_with_args(&self, args: LambdaFunctionArgs) -> Result<ColumnarValue> { let [list_value, lambda] = take_function_args(self.name(), &args.args)?; let (ValueOrLambda::Value(list_value), ValueOrLambda::Lambda(lambda)) = (list_value, lambda) else { return exec_err!(... ``` Since the beginning I worked to support multiple lambdas to implement union manipulating functions like this: ```sql union_transform( union_value, 'str', str -> trim(str), 'num', num -> num*2, 'bool', bool -> NOT bool, ) ``` > Lambda expression that access columns that are not in the list itself [Already supported and tested (I call it column capture through the PR and comments. Please ignore the comment above the test, I'm going to remove it) ](https://github.com/apache/datafusion/pull/18921/changes#diff-53634af9d48017f60cb545dfe2aaab20d1f7fa3435947c161ef22515fa62ed8dR55-R58) ```sql CREATE TABLE t as SELECT 1 as n; query ? SELECT array_transform([1, 2], (e) -> n) from t; ---- [1, 1] ``` > optional arguments for lambda, for example the index of the item in the list the optional here is important as I want to avoid creating that input if I don't need to. [Also supported ](https://github.com/apache/datafusion/pull/18921/changes#diff-e464d894d5a0fe37392c40f9e54f67590220578259a8dc41fa57b329f6f0d343R168-R177) ```rust // use closures so that bind_lambda_variables evaluates only the params that are actually referenced // avoiding unnecessary computations let values_param = || Ok(Arc::clone(list_values)); let indices_param = || elements_indices(&list_array); let binded_body = bind_lambda_variables( Arc::clone(&lambda.body), &lambda.params, &[&values_param, &indices_param], )?; ``` *after a refactor, bind_lambda_params actually eager evaluated all params, but I'll fix that > Nested lambda expressions [Supported](http://github.com/apache/datafusion/pull/18921/changes#diff-53634af9d48017f60cb545dfe2aaab20d1f7fa3435947c161ef22515fa62ed8dR45-R47) ```sql SELECT array_transform(t.v, (v1, i) -> array_transform(v1, (v2, j) -> array_transform(v2, v3 -> j)) ) from t; ---- [[[0, 1], [0]], [[0]], [[]]] ``` > array_transform, the tricky part for this function is its return type depends on lambda `LambdaUDF` contains a method `lambdas_arguments` where the implementation must return the type of the parameters of all it's lambdas when evaluated with a given set of values. Then DF uses this info to compute the type of the lambdas and pass it on `return_field_from_args`, so the implementation can easily compute it's return type. For example the expr `array_transform([1, 2], v -> repeat(v, 'a'))`, `lambdas_arguments` would receive `[ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda]` and should return `vec![None, Some(vec![DataType::Int32])]`. Then `return_field_from_args` would be called with `[ValueOrLambda::Value(List(Int32)), ValueOrLambda::Lambda(Utf8)]`, where the implementation would need just to return `List(Utf8)`. <details><summary>return_field_from_args method</summary> ```rust pub struct LambdaReturnFieldArgs<'a> { /// The data types of the arguments to the function /// /// If argument `i` to the function is a lambda, it will be the field returned by the /// lambda when executed with the arguments returned from `LambdaUDF::lambdas_parameters` /// /// For example, with `array_transform([1], v -> v == 5)` /// this field will be `[Field::new("", DataType::List(DataType::Int32), false), Field::new("", DataType::Boolean, false)]` pub arg_fields: &'a [ValueOrLambdaField], /// Is argument `i` to the function a scalar (constant)? /// /// If the argument `i` is not a scalar, it will be None /// /// For example, if a function is called like `my_function(column_a, 5)` /// this field will be `[None, Some(ScalarValue::Int32(Some(5)))]` pub scalar_arguments: &'a [Option<&'a ScalarValue>], } /// A tagged Field indicating whether it correspond to a value or a lambda argument #[derive(Clone, Debug)] pub enum ValueOrLambdaField { /// The Field of a ColumnarValue argument Value(FieldRef), /// The Field of the return of the lambda body when evaluated with the parameters from LambdaUDF::lambda_parameters Lambda(FieldRef), } fn return_field_from_args( &self, args: datafusion_expr::LambdaReturnFieldArgs, ) -> Result<Arc<Field>> { let [ValueOrLambdaField::Value(list), ValueOrLambdaField::Lambda(lambda)] = take_function_args(self.name(), args.arg_fields)? else { return exec_err!( "{} expects a value follewed by a lambda, got {:?}", self.name(), args ); }; //TODO: should metadata be copied into the transformed array? // lambda is the resulting field of executing the lambda body // with the parameters returned in lambdas_parameters let field = Arc::new(Field::new( Field::LIST_FIELD_DEFAULT_NAME, lambda.data_type().clone(), lambda.is_nullable(), )); let return_type = match list.data_type() { DataType::List(_) => DataType::List(field), DataType::LargeList(_) => DataType::LargeList(field), DataType::FixedSizeList(_, size) => DataType::FixedSizeList(field, *size), _ => unreachable!(), }; Ok(Arc::new(Field::new("", return_type, list.is_nullable()))) } ``` </details> <details><summary>lambdas_parameters method</summary> ```rust trait LambdaUDF { /// Returns the parameters that any lambda supports fn lambdas_parameters( &self, args: &[ValueOrLambdaParameter], ) -> Result<Vec<Option<Vec<Field>>>>; } pub enum ValueOrLambdaParameter<'a> { /// A columnar value with the given field Value(FieldRef), /// A lambda Lambda, } // array_transform implementation impl LambdaUDF for ArrayTransform { fn lambdas_parameters( &self, args: &[ValueOrLambdaParameter], ) -> Result<Vec<Option<Vec<Field>>>> { let [ValueOrLambdaParameter::Value(list), ValueOrLambdaParameter::Lambda] = args else { return exec_err!( "{} expects a value followed by a lambda, got {:?}", self.name(), args ); }; let (field, index_type) = match list.data_type() { DataType::List(field) => (field, DataType::Int32), DataType::LargeList(field) => (field, DataType::Int64), DataType::FixedSizeList(field, _) => (field, DataType::Int32), _ => return exec_err!("expected list, got {list}"), }; // we don't need to omit the index in the case the lambda don't specify, e.g. array_transform([], v -> v*2), // nor check whether the lambda contains more than two parameters, e.g. array_transform([], (v, i, j) -> v+i+j), // as datafusion will do that for us let value = Field::new("value", field.data_type().clone(), field.is_nullable()) .with_metadata(field.metadata().clone()); let index = Field::new("index", index_type, false); Ok(vec![None, Some(vec![value, index])]) } } ``` </details> > pub struct LambdaExpr { /// Parameter names/types already resolved pub param_types: Vec<DataType>, /// Expression body, what needs to be evaluated, this thing potentially can be UDF pub body: Arc<dyn PhysicalExpr>, } The current implementation is functional only with the parameters names without requiring their datatypes, and IHMO it's easier to the users to be like that, both expr_api and sql. The expr api could look like this: ```rust array_transform( col("my_array"), lambda( ["current_value"], // define the parameters name lambda_variable("current_value") * lit(2) // the lambda body ) ) ``` How to parse the clickhouse syntax I believe it's a question for the future: either check that the types match with the ones returned by `LambdaUDF::lambdas_parameters`, or cast the parameters to the specified type > 1. replace null lists that is not empty underneath > 3. fixed size list with a null list > I'm conflicted on whether we should fix case 1 for them as it is costly and the user might have prior knowledge to avoid that. Saw your review in #17220 but thought it mattered only for performance and didn't thought about fallible expressions.. Yeah.. maybe we can add a helper udf that clean the list on user demand, to be called before calling the lambda udf? For fixed size lists, I think we can replace null sublists with the first non-null sublist of the array, if the non-null also fails then it's a user problem > 2 .sliced list and not computing values outside the sliced data Also saw on #17220 about this but forgot it, thanks for reminding me, will fix soon > In every case here, we should answer: > > how would the user handle that? > how could we make it easier for the user? > How would we help them avoid forgetting handling? I think the best we can do is to document it, add examples and helpers functions and add a lot of comments in one or few LambdaUDFs as reference implementations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
