I think B) is closer to what I was thinking. We may be using the term statically and dynamically typed a little differently -- I am sorry for the confusion. I have somewhat lost track of exactly what we are proposing and for that I apologize.
I propose a next step of sketching out a proposed API for DataFusion UDFs to implement, and circulate that around for commentary. I don't think I will have time to do this any time soon (unless it becomes directly important for the project I am working on) Thanks for taking the initiative on this, Andrew On Wed, Aug 19, 2020 at 2:29 PM Jorge Cardoso Leitão < jorgecarlei...@gmail.com> wrote: > Hi, > > Thank you for this enlightening discussion, Andrew! > > So, just to make sure I understood, are you proposing A), B) or something > else? > > A) we should not accept / declare polymorphic operations: all types should > be known based on the operation name (e.g. sum_f32, plus_f32, etc.) > B) we should continue to have "sum", "count", "+", etc. as polymorphic > operations, but we should not allow registering udfs as polymorphic, both > internally nor externally. I.e. all polymorphic operations are hard-coded. > > Let's assume A) first. I relate to the sentiment that Rust is statically > typed. However, as I see it, DataFusion is not: our main traits are > arrow::array::Array and RecordBatch, which are both dynamically typed (e.g. > Array::{data_type,as_any} and RecordBatch). Since all ops are also > dynamically typed (they receive Arc<Array> or RecordBatch) and use runtime > reflection via `match array.data_type()` at the physical level to downcast > Array to its respective native type, wouldn't A) lead to a major change in > DataFusion? > > Let's now assume B), and let me try to expand on your 3 points: > > 1. Once an operation in our plan is polymorphic, the whole plan is > polymorphic and the final schema can only be inferred from the initial > projection's schema / scan. A simple example of this using only functions > that we currently support is: > > df = scan([c1 float32, c2 float64, c3 float64]) > df = df.select(c1 * c2 as sum12, c1 * c3 as sum13) > df = df.aggregate(MIN(sum12), MIN(sum13)) > > The plan for this is that the first product returns a float32 (lower > precision of both), and the second returns a float64. MIN's return type now > depends on the first select's return type, which is in a previous node. So, > even if only our internal ops are polymorphic, this is sufficient to > require our optimizers to handle dynamically typed expressions and schemas > whose type is only known during planning (after the scan's schema is > known). > > 2. I relate to that sentiment. About the same time Andy proposed the (now > UDFs) dynamically typed UDFs, I made a 1k+ proposal for statically typed > UDFs. In retrospect, IMO dynamically typed UDFs are a far superior offering > as they offer an enormous flexibility and at no additional cost: we could > offer users an interface with fixed types only (e.g. via a macro), but, in > the end, all our memory structures are dynamic typed anyway (Array, > RecordBatch, etc.), and thus whether the user or us, a downcast will still > need to take place at runtime. > > 3. Users are still able to specify the type they want in query languages > that support polymorphic functions such as postgres, both at the query > level and on polymorphic UDFs. Most dialects support cast operations that > allow users to narrow types (::float in postgres, CAST(x AS float64) in > spark), that are only physically executed if needed. > > So, to summarize my thoughts so far: > > i) DataFusion is dynamically typed by design > ii) We already support dynamically typed scalar UDFs > iii) we currently have polymorphic functions (internally) and already have > to deal with them on our logical and physical plans. > iv) there is no practical limitation of supporting polymorphic UDFs, it is > a matter of whether the benefits outweigh the development and maintenance > costs. > > I am inclined to say that given i-iii), we should support polymorphic > (scalar and agg) UDFs, which would put us on the same level of UDF support > as postgres. However, we should offer a very easy interface for users to > register a non-polymorphic UDF, e.g. > > ctx.register(name, udf(callable, arg_types, return_type)?)? > > where udf returns the specialization of a generic UDF that expects N types > and returns return_type. > > Best, > Jorge > > > On Tue, Aug 18, 2020 at 6:52 PM Andrew Lamb <al...@influxdata.com> wrote: > > > It is my personal opinion that actual UDF functions registered with data > > fusion should take a known set of input types and single return type > (e.g. > > sum_i32 --> i32). I think this would: > > 1. Simplify the implementation of both the DataFusion optimizer and the > > UDFs > > 2. Make it easier for UDF writers as the UDF code would look more like > > Rust: the types would be clear from the function signatures, as is the > case > > in Rust in general > > 3. Give the user of SQL / DataFrames the ability to specifically specify > > what types they want > > > > If we wanted the ability for the user to specify `sum(i)` and let the > type > > coercion pass pick `sum_i32` or `sum_i64` depending on the input types, I > > recommend doing that at a different level than the UDF (perhaps via > > `register_alias("sum", "sum_i32)` or something), again for both clarity > of > > DataFusion implementation as well as UDF specification. > > > > Andrew > > > > On Mon, Aug 17, 2020 at 4:52 PM Jorge Cardoso Leitão < > > jorgecarlei...@gmail.com> wrote: > > > > > Thanks Andrew, > > > > > > I am not sure I articulated this well enough, though, as I did not > > specify > > > the type of polymorphism that I was thinking about. xD > > > > > > My question was/is about whether we should accept functions whose > return > > > type is known during planning, and constant during execution, or > whether > > > their return types must be constant both during planning and > execution. I > > > do not think we should support variable types during execution for the > > > reasons that you enumerated. If by runtime polymorphism you mean > changing > > > types during execution, then I very much agree with you that that is a > > > no-no. > > > > > > During planning, though, we have options: should we allow users to > write > > > something like `my_operation(f32|f64) -> (f32|f64)`, on which the type > is > > > inferred after we know the function's input in the logical plan, or > > should > > > we not allow that and require users to register `my_operation_f32(f32)` > > and > > > `my_operation_f64(f64)` separately? The three findings that I mentioned > > > above refer to planned polymorphism: return type is resolved during > > > planning (and constant during execution). > > > > > > The biggest use-case IMO for polymorphism during planning is for > > functions > > > that yield structures/lists of values (a-la collect_list) whose type > can > > > only be inferred after we know the functions' input type (array(f32) vs > > > array(f64)), and whose implementation can be generalized via a macro + > > > match. > > > > > > From a technical point of view, we currently have functions with > variable > > > types (all binary operators' return type depends on the lhs' type, sum, > > > max/min, etc.), and we have to handle the main planning challenges > > already. > > > In this context, the questions are something like: > > > > > > 1. should we continue to have them or should we move away from them? > > > 2.1 If not, what should we do with them? E.g. declare sum_i32, sum_i64, > > > etc., that have a single return type? > > > 2.2 if yes, show we allow users to register these types of functions, > or > > > should these only be allowed within DataFusion's code base? > > > > > > Best, > > > Jorge > > > > > > > > > > > > On Mon, Aug 17, 2020 at 9:53 PM Andrew Lamb <al...@influxdata.com> > > wrote: > > > > > > > In my opinion, I suggest we do not continue down the path of > (runtime) > > > > polymorphic functions unless a compelling use case for them can be > > > > articulated. > > > > > > > > You have done a great job articulating some of the implementation > > > > challenges, but I personally struggle to describe when, as a user of > > > > DataFusion, I would want to write a (runtime) polymorphic function. > > > > > > > > A function with runtime polymorphism I think would mean the UDF could > > > > handle the type changing *at runtime*: record batches could come in > > with > > > > multiple different types during the same execution. I can't think of > > > > examples where this behavior would be desirable or necessary. > > > > > > > > The existing DataFusion codebase seems to assume (reasonably in my > > > opinion) > > > > that the schema of each Logical / Physical plan node is known at > > planning > > > > time and it does not change at runtime. > > > > > > > > Most query optimizers (and compilers for that matter) take advantage > of > > > > plan (compile) time type information to make runtime more efficient. > > > Also, > > > > it seems like other database / runtime systems such as mysql[1] and > > > > postgres[2] require the UDF creator to explicitly specify the return > > type > > > > as well. I think we should consider the simpler semantics of "1 > return > > > type > > > > for each UDF" to make it easier on people writing UDFs as well as > > > > simplifying the implementation of DataFusion itself. > > > > > > > > Andrew > > > > > > > > [1] https://dev.mysql.com/doc/refman/8.0/en/create-function-udf.html > > > > [2] https://www.postgresql.org/docs/12/sql-createfunction.html > > > > > > > > On Mon, Aug 17, 2020 at 12:31 PM Jorge Cardoso Leitão < > > > > jorgecarlei...@gmail.com> wrote: > > > > > > > > > Hi, > > > > > > > > > > Recently, I have been contributing to DataFusion, and I would like > to > > > > bring > > > > > to your attention a question that I faced while PRing to DataFusion > > > that > > > > > IMO needs some alignment :) > > > > > > > > > > DataFusion supports scalar UDFs: functions that expect a type, > > return a > > > > > type, and performs some operation on the data (a-la spark UDF). > > > However, > > > > > the execution engine is actually dynamically typed: > > > > > > > > > > * a scalar UDF receives an &[ArrayRef] that must be downcasted > > > > accordingly > > > > > * a scalar UDF must select the builder that matches its signature, > so > > > > that > > > > > its return type matches the ArrayRef that it returns. > > > > > > > > > > This suggests that we can treat functions as polymorphic: as long > as > > > the > > > > > function handles the different types (e.g. via match), we are good. > > We > > > > > currently do not support multiple input types nor variable return > > types > > > > in > > > > > their function signatures. > > > > > > > > > > Our current (non-udf) scalar and aggregate functions are already > > > > > polymorphic on both their input and return type: sum(i32) -> i64, > > > > sum(f64) > > > > > -> f64, "a + b". I have been working on PRs to support polymorphic > > > > support > > > > > to scalar UDFs (e.g. sqrt() can take float32 and float64) [1,3], as > > > well > > > > as > > > > > polymorphic aggregate UDFs [2], so that we can extend our offering > to > > > > more > > > > > interesting functions such as "length(t) -> uint", "array(c1, c2)", > > > > > "collect_list(t) -> array(t)", etc. > > > > > > > > > > However, while working on [1,2,3], I reach some non-trivial > findings > > > > that I > > > > > would like to share: > > > > > > > > > > Finding 1: to support polymorphic functions, our logical and > physical > > > > > expressions (Expr and PhysicalExpr) need to be polymorphic as-well: > > > once > > > > a > > > > > function is polymorphic, any expression containing it is also > > > > polymorphic. > > > > > > > > > > Finding 2: when a polymorphic expression passes through our type > > > coercer > > > > > optimizer (that tries to coerce types to match a function's > > signature), > > > > it > > > > > may be re-casted to a different type. If the return type changes, > the > > > > > optimizer may need to re-cast operations dependent of the function > > call > > > > > (e.g. a projection followed by an aggregation may need a recast on > > the > > > > > projection and on the aggregation). > > > > > > > > > > Finding 3: when an expression passes through our type coercer > > optimizer > > > > and > > > > > is re-casted, its name changes (typically from "expr" to "CAST(expr > > as > > > > > X)"). This implies that a column referenced as #expr down the plan > > may > > > > not > > > > > exist depending on the input type of the initial projection/scan. > > > > > > > > > > Finding 1 and 2 IMO are a direct consequence of polymorphism and > the > > > only > > > > > way to not handle them is by not supporting polymorphism (e.g. the > > user > > > > > registers sqrt_f32 and sqrt_f64, etc). > > > > > > > > > > Finding 3 can be addressed in at least three ways: > > > > > > > > > > A) make the optimizer rewrite the expression as "CAST(expr as X) AS > > > > expr", > > > > > so that it retains its original name. This hides the actual > > > expression's > > > > > calculation, but preserves its original name. > > > > > B) accept that expressions can always change its name, which means > > that > > > > the > > > > > user should be mindful when writing `col("SELECT sqrt(x) FROM t"`, > as > > > the > > > > > column name may end up being called `"sqrt(CAST(x as X))"`. > > > > > C) Do not support polymorphic functions > > > > > > > > > > Note that we currently already experience effects 1-3, it is just > > that > > > we > > > > > use so few polymorphic functions that these seldomly present > > > themselves. > > > > It > > > > > was while working on [1,2,3] that I start painting the bigger > > picture. > > > > > > > > > > Some questions: > > > > > 1. should continue down the path of polymorphic functions? > > > > > 2. if yes, how do handle finding 3? > > > > > > > > > > Looking at the current code base, I am confident that we can > address > > > the > > > > > technical issues to support polymorphic functions. However, it > would > > be > > > > > interesting to have your thoughts on this. > > > > > > > > > > [1] https://github.com/apache/arrow/pull/7967 > > > > > [2] https://github.com/apache/arrow/pull/7971 > > > > > [3] https://github.com/apache/arrow/pull/7974 > > > > > > > > > > > > > > >