alamb commented on code in PR #7359: URL: https://github.com/apache/arrow-datafusion/pull/7359#discussion_r1304409307
########## docs/source/library-user-guide/working-with-exprs.md: ########## @@ -19,4 +19,205 @@ # Working with Exprs -Coming Soon +<!-- https://github.com/apache/arrow-datafusion/issues/7304 --> + +`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation. + +For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator. + +As another example, the SQL expression `a + b * c` would be represented as an `Expr` with a `BinaryExpr` variant. The left `Expr` would be `a` and the right `Expr` would be another `BinaryExpr` with a left `Expr` of `b` and a right `Expr` of `c`. + +As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF. + +There are also executable examples for working with `Expr`s: + +- [rewrite_expr.rs](../../../datafusion-examples/examples/catalog.rs) +- [expr_api.rs](../../../datafusion-examples/examples/expr_api.rs) + +## A Scalar UDF Example + +Let's start by creating our own `Expr` in the form of a Scalar UDF. This UDF will simply add 1 to the input value. + +```rust +pub fn add_one(args: &[ArrayRef]) -> Result<ArrayRef> { + let i64s = as_int64_array(&args[0])?; + + let array = i64s + .iter() + .map(|item| match item { + Some(value) => Some(value + 1), + None => None, + }) + .collect::<Int64Array>(); + + Ok(Arc::new(array)) +} +``` + +And our `ScalarUDF` would look like this. Please see the section on [adding UDFs](./adding-udfs.md) for more information on how to create a `ScalarUDF`. + +```rust +let add_one_udf = create_udf( + "add_one", + vec![DataType::Int64], + Arc::new(DataType::Int64), + Volatility::Immutable, + make_scalar_function(add_one), +); +``` + +## Creating Exprs Programmatically + +In addition to SQL strings, you can create `Expr`s programatically. This is common if you're working with a DataFrame vs. a SQL string. A simple example is: + +```rust +use datafusion::prelude::*; + +let expr = lit(5) + lit(5); +``` + +This is obviously a very simple example, but it shows how you can create an `Expr` from a literal value and sets us up later for how to simplify `Expr`s. You can also create `Expr`s from column references and operators: + +```rust +use datafusion::prelude::*; + +let expr = col("a") + col("b"); +``` + +In fact, the `add_one_udf` we created earlier is also an `Expr`. And because it's so simple, we'll use it as fodder for how to rewrite `Expr`s. + +## Rewriting Exprs + +Rewriting Expressions is the process of taking an `Expr` and transforming it into another `Expr`. This is useful for a number of reasons, including: + +- Simplifying `Expr`s to make them easier to evaluate +- Optimizing `Expr`s to make them faster to evaluate +- Converting `Expr`s to other forms, e.g. converting a `BinaryExpr` to a `CastExpr` + +In our example, we'll use rewriting to update our `add_one` UDF, to be rewritten as a `BinaryExpr` with a `Literal` of 1. We're effectively inlining the UDF. + +### Rewriting with `transform` + +To implement the inlining, we'll need to write a function that takes an `Expr` and returns a `Result<Expr>`. If the expression is _not_ to be rewritten `Transformed::No` is used to wrap the original `Expr`. If the expression _is_ to be rewritten, `Transformed::Yes` is used to wrap the new `Expr`. + +```rust +fn rewrite_add_one(expr: Expr) -> Result<Expr> { + expr.transform(&|expr| { + Ok(match expr { + Expr::ScalarUDF(scalar_fun) => { + // rewrite the expression if the function is "add_one", otherwise return the original expression + if scalar_fun.fun.name == "add_one" { + let input_arg = scalar_fun.args[0].clone(); + + let new_expression = BinaryExpr::new( + Box::new(input_arg), + datafusion::logical_expr::Operator::Plus, + Box::new(Expr::Literal(datafusion::scalar::ScalarValue::Int64(Some( + 1, + )))), + ); + + Transformed::Yes(Expr::BinaryExpr(new_expression)) + } else { + // a scalar function that is not "add_one" is not rewritten + Transformed::No(Expr::ScalarUDF(scalar_fun)) + } + } + _ => Transformed::No(expr), // not a scalar function, so not rewritten + }) + }) +} +``` + +### Creating an `OptimizerRule` + +In DataFusion, an `OptimizerRule` is a trait that supports rewriting `Expr`s. It follows DataFusion's general mantra of trait implementations to drive behavior. + +We'll call our rule `AddOneInliner` and implement the `OptimizerRule` trait. The `OptimizerRule` trait has two methods: + +- `name` - returns the name of the rule +- `try_optimize` - takes a `LogicalPlan` and returns an `Option<LogicalPlan>`. If the rule is able to optimize the plan, it returns `Some(LogicalPlan)` with the optimized plan. If the rule is not able to optimize the plan, it returns `None`. + +```rust +struct AddOneInliner {} + +impl OptimizerRule for AddOneInliner { + fn name(&self) -> &str { + "add_one_inliner" + } + + fn try_optimize( + &self, + plan: &LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result<Option<LogicalPlan>> { + // recurse down and optimize children first + let optimized_plan = utils::optimize_children(self, plan, config)?; + + match optimized_plan { + Some(LogicalPlan::Projection(projection)) => { + let proj_expression = projection + .expr + .iter() + .map(|expr| rewrite_add_one(expr.clone())) + .collect::<Result<Vec<_>>>()?; + + let proj = Projection::try_new(proj_expression, projection.input)?; + + Ok(Some(LogicalPlan::Projection(proj))) + } + Some(optimized_plan) => Ok(Some(optimized_plan)), + None => match plan { + LogicalPlan::Projection(projection) => { + let proj_expression = projection + .expr + .iter() + .map(|expr| rewrite_add_one(expr.clone())) + .collect::<Result<Vec<_>>>()?; + + let proj = Projection::try_new(proj_expression, projection.input.clone())?; + + Ok(Some(LogicalPlan::Projection(proj))) + } + _ => Ok(None), + }, + } + } +} +``` + +Note the use of `rewrite_add_one` which is mapped over the `expr` of the `Projection`. This is the function we wrote earlier that takes an `Expr` and returns a `Result<Expr>`. + +We're almost there. Let's just test our rule works properly. + +## Testing the Rule + +Testing the rule is fairly simple, we can create a SessionState with our rule and then create a DataFrame and run a query. The logical plan will be optimized by our rule. + +```rust +use datafusion::prelude::*; + +let rules = Arc::new(AddOneInliner {}); +let state = ctx.state().with_optimizer_rules(vec![rules]); + +let ctx = SessionContext::with_state(state); +ctx.register_udf(add_one); + +let sql = "SELECT add_one(1) AS added_one"; +let plan = ctx.sql(sql).await?.logical_plan(); + +println!("{:?}", plan); +``` + +This results in the following output: + +```text +Projection: Int64(1) + Int64(1) AS added_one + EmptyRelation +``` + +I.e. the `add_one` UDF has been inlined into the projection. + +## Conclusion + +In this guide, we've seen how to create `Expr`s programmatically and how to rewrite them. This is useful for simplifying and optimizing `Expr`s. We've also seen how to test our rule to ensure it works properly. Review Comment: This is very cool ❤️ ########## docs/source/library-user-guide/working-with-exprs.md: ########## @@ -19,4 +19,205 @@ # Working with Exprs -Coming Soon +<!-- https://github.com/apache/arrow-datafusion/issues/7304 --> + +`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation. + +For example, the SQL expression `a + b` would be represented as an `Expr` with a `BinaryExpr` variant. A `BinaryExpr` has a left and right `Expr` and an operator. + +As another example, the SQL expression `a + b * c` would be represented as an `Expr` with a `BinaryExpr` variant. The left `Expr` would be `a` and the right `Expr` would be another `BinaryExpr` with a left `Expr` of `b` and a right `Expr` of `c`. + +As the writer of a library, you may want to use or create `Expr`s to represent computations that you want to perform. This guide will walk you through how to make your own scalar UDF as an `Expr` and how to rewrite `Expr`s to inline the simple UDF. + +There are also executable examples for working with `Expr`s: + +- [rewrite_expr.rs](../../../datafusion-examples/examples/catalog.rs) +- [expr_api.rs](../../../datafusion-examples/examples/expr_api.rs) + +## A Scalar UDF Example + +Let's start by creating our own `Expr` in the form of a Scalar UDF. This UDF will simply add 1 to the input value. + +```rust +pub fn add_one(args: &[ArrayRef]) -> Result<ArrayRef> { + let i64s = as_int64_array(&args[0])?; + + let array = i64s + .iter() + .map(|item| match item { + Some(value) => Some(value + 1), + None => None, + }) + .collect::<Int64Array>(); + + Ok(Arc::new(array)) +} +``` + +And our `ScalarUDF` would look like this. Please see the section on [adding UDFs](./adding-udfs.md) for more information on how to create a `ScalarUDF`. + +```rust +let add_one_udf = create_udf( + "add_one", + vec![DataType::Int64], + Arc::new(DataType::Int64), + Volatility::Immutable, + make_scalar_function(add_one), +); +``` Review Comment: I think this section could mostly be a reference to ./adding-udfs.md) and then maybe show an example of making an expr to call the UDF. Something like ```rust let add_one_udf = create_udf(/*example from before*/); // make add_one_udf(4) let expr = add_one_udf.call(lit(5)); // make add_one_udf(my_column) let expr = add_one_udf.call(col("my_column")); ########## docs/source/library-user-guide/adding-udfs.md: ########## @@ -19,4 +19,107 @@ # Adding User Defined Functions: Scalar/Window/Aggregate -Coming Soon +User Defined Functions (UDFs) are functions that can be used in the context of DataFusion execution. + +This page covers how to add UDFs to DataFusion. In particular, it covers how to add Scalar, Window, and Aggregate UDFs. + +| UDF Type | Description | Example | +| --------- | ---------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------- | +| Scalar | A function that takes a row of data and returns a single value. | [simple_udf.rs](../../../datafusion-examples/examples/simple_udf.rs) | +| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | [simple_udwf.rs](../../../datafusion-examples/examples/simple_udwf.rs) | +| Aggregate | A function that takes a group of rows and returns a single value. | [simple_udaf.rs](../../../datafusion-examples/examples/simple_udaf.rs) | + +First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about the differences between the different types of UDFs. + +## Adding a Scalar UDF + +A Scalar UDF is a function that takes a row of data and returns a single value. For example, this function takes a single i64 and returns a single i64 with 1 added to it: + +```rust +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array}; +use datafusion::common::Result; + +use datafusion::common::cast::as_int32_array; + +pub fn add_one(args: &[ArrayRef]) -> Result<ArrayRef> { + let i64s = as_int32_array(&args[0])?; Review Comment: I think this coverts to `Int32` rather than `Int64` -- so perhaps for consistency it should be ```suggestion let i64s = as_int64_array(&args[0])?; ``` ########## docs/source/library-user-guide/working-with-exprs.md: ########## @@ -19,4 +19,205 @@ # Working with Exprs -Coming Soon +<!-- https://github.com/apache/arrow-datafusion/issues/7304 --> + +`Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation. Review Comment: ```suggestion `Expr` is short for "expression". It is a core abstraction in DataFusion for representing a computation, and follows the standard "expression tree" abstraction found in most compilers and databases. ``` ########## docs/source/library-user-guide/adding-udfs.md: ########## @@ -19,4 +19,107 @@ # Adding User Defined Functions: Scalar/Window/Aggregate -Coming Soon +User Defined Functions (UDFs) are functions that can be used in the context of DataFusion execution. + +This page covers how to add UDFs to DataFusion. In particular, it covers how to add Scalar, Window, and Aggregate UDFs. + +| UDF Type | Description | Example | +| --------- | ---------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------- | +| Scalar | A function that takes a row of data and returns a single value. | [simple_udf.rs](../../../datafusion-examples/examples/simple_udf.rs) | +| Window | A function that takes a row of data and returns a single value, but also has access to the rows around it. | [simple_udwf.rs](../../../datafusion-examples/examples/simple_udwf.rs) | +| Aggregate | A function that takes a group of rows and returns a single value. | [simple_udaf.rs](../../../datafusion-examples/examples/simple_udaf.rs) | + +First we'll talk about adding an Scalar UDF end-to-end, then we'll talk about the differences between the different types of UDFs. + +## Adding a Scalar UDF + +A Scalar UDF is a function that takes a row of data and returns a single value. For example, this function takes a single i64 and returns a single i64 with 1 added to it: + +```rust +use std::sync::Arc; + +use arrow::array::{ArrayRef, Int64Array}; +use datafusion::common::Result; + +use datafusion::common::cast::as_int32_array; + +pub fn add_one(args: &[ArrayRef]) -> Result<ArrayRef> { + let i64s = as_int32_array(&args[0])?; + + let array = i64s + .iter() + .map(|sequence| match sequence { + Some(value) => Some(value + 1), + None => None, + }) + .collect::<Int64Array>(); + + Ok(Arc::new(array)) +} +``` + +For brevity, we'll skipped some error handling, but e.g. you may want to check that `args.len()` is the expected number of arguments. + +This "works" in isolation, i.e. if you have a slice of `ArrayRef`s, you can call `add_one` and it will return a new `ArrayRef` with 1 added to each value. + +```rust +let input = vec![Some(1), None, Some(3)]; +let input = Arc::new(Int64Array::from(input)) as ArrayRef; + +let result = add_one(&[input]).unwrap(); +let result = result.as_any().downcast_ref::<Int64Array>().unwrap(); + +assert_eq!(result, &Int64Array::from(vec![Some(2), None, Some(4)])); +``` + +The challenge however is that DataFusion doesn't know about this function. We need to register it with DataFusion so that it can be used in the context of a query. + +### Registering a Scalar UDF + +To register a Scalar UDF, you need to wrap the function implementation in a `ScalarUDF` struct and then register it with the `SessionContext`. DataFusion provides the `create_udf` and `make_scalar_function` helper functions to make this easier. + +```rust +let udf = create_udf( + "add_one", + vec![DataType::Int64], + Arc::new(DataType::Int64), + Volatility::Immutable, + make_scalar_function(add_one), +); +``` + +A few things to note: + +- The first argument is the name of the function. This is the name that will be used in SQL queries. +- The second argument is a vector of `DataType`s. This is the list of argument types that the function accepts. I.e. in this case, the function accepts a single `Int64` argument. +- The third argument is the return type of the function. I.e. in this case, the function returns an `Int64`. +- The fourth argument is the volatility of the function. This is an enum with three options: `Immutable`, `Stable`, and `Volatile`. This is used to determine if the function can be cached in some situations. In this case, the function is `Immutable` because it always returns the same value for the same input. A random number generator would be `Volatile` because it returns a different value for the same input. +- The fifth argument is the function implementation. This is the function that we defined above. + +That gives us a `ScalarUDF` that we can register with the `SessionContext`: + +```rust +let mut ctx = SessionContext::new(); + +ctx.register_udf(udf); +``` + +At this point, the following could is expected to work: + +```rust +let sql = "SELECT add_one(1)"; + +let df = ctx.sql(&sql).await.unwrap(); +``` + +## Adding a Window UDF + +Scalar UDFs are functions that take a row of data and return a single value. Window UDFs are similar, but they also have access to the rows around them. Access to the the proximal rows is helpful, but adds some complexity to the implementation. + +Body coming soon. Review Comment: 👍 -- I think it is fine to leave placeholder text here while we work on it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
