This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new e1b992a787 Add `field` trait method to `WindowUDFImpl`, remove
`return_type`/`nullable` (#12374)
e1b992a787 is described below
commit e1b992a7878e78e8a63b7e24425c665727bda493
Author: jcsherin <[email protected]>
AuthorDate: Sat Sep 21 16:53:08 2024 +0530
Add `field` trait method to `WindowUDFImpl`, remove
`return_type`/`nullable` (#12374)
* Adds new library `functions-window-common`
* Adds `FieldArgs` struct for field of final result
* Adds `field` method to `WindowUDFImpl` trait
* Minor: fixes formatting
* Fixes: udwf doc test
* Fixes: implements missing trait items
* Updates `datafusion-cli` dependencies
* Fixes: formatting of `Cargo.toml` files
* Fixes: implementation of `field` in udwf example
* Pass `FieldArgs` argument to `field`
* Use `field` in place of `return_type` for udwf
* Update `field` in udwf implementations
* Fixes: implementation of `field` in udwf example
* Revert unrelated change
* Mark `return_type` for udwf as unreachable
* Delete code
* Uses schema name of udwf to construct `FieldArgs`
* Adds deprecated notice to `return_type` trait method
* Add doc comments to `field` trait method
* Reify `input_types` when creating the udwf window expression
* Rename name field to `schema_name` in `FieldArgs`
* Make `FieldArgs` opaque
* Minor refactor
* Removes `nullable` trait method from `WindowUDFImpl`
* Add doc comments
* Rename to `WindowUDFResultArgs`
* Minor: fixes formatting
* Copy edits for doc comments
* Renames field to `function_name`
* Rename struct to `WindowUDFFieldArgs`
* Add comments for unreachable code
* Copy edit for `WindowUDFImpl::field` trait method
* Renames module
* Fix warning: unused doc comment
* Minor: rename bindings
* Minor refactor
* Minor: copy edit
* Fixes: use `Expr::qualified_name` for window function name
* Fixes: apply previous fix to `Expr::nullable`
* Refactor: reuse type coercion for window functions
* Fixes: clippy errors
* Adds name parameter to `WindowFunctionDefinition::return_type`
* Removes `return_type` field from `SimpleWindowUDF`
* Add doc comment for helper method
* Rewrite doc comments
* Minor: remove empty comment
* Remove `WindowUDFImpl::return_type`
* Fixes doc test
---
Cargo.toml | 2 +
datafusion-cli/Cargo.lock | 10 ++
datafusion-examples/examples/advanced_udwf.rs | 11 +-
.../examples/simplify_udwf_expression.rs | 12 +-
datafusion/core/Cargo.toml | 1 +
.../user_defined/user_defined_window_functions.rs | 14 +--
datafusion/expr/Cargo.toml | 1 +
datafusion/expr/src/expr.rs | 32 ++---
datafusion/expr/src/expr_fn.rs | 13 +-
datafusion/expr/src/expr_schema.rs | 140 ++++++++++++---------
datafusion/expr/src/function.rs | 2 +
datafusion/expr/src/udwf.rs | 68 ++++------
.../Cargo.toml | 18 +--
datafusion/functions-window-common/README.md | 26 ++++
datafusion/functions-window-common/src/field.rs | 64 ++++++++++
datafusion/functions-window-common/src/lib.rs | 21 ++++
datafusion/functions-window/Cargo.toml | 1 +
datafusion/functions-window/src/row_number.rs | 11 +-
datafusion/optimizer/Cargo.toml | 1 +
.../src/simplify_expressions/expr_simplifier.rs | 9 +-
datafusion/physical-plan/Cargo.toml | 1 +
datafusion/physical-plan/src/windows/mod.rs | 19 ++-
datafusion/proto/Cargo.toml | 1 +
.../proto/tests/cases/roundtrip_logical_plan.rs | 24 ++--
24 files changed, 322 insertions(+), 180 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index c80297a1f5..e8cd52315a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,7 @@ members = [
"datafusion/functions-aggregate-common",
"datafusion/functions-nested",
"datafusion/functions-window",
+ "datafusion/functions-window-common",
"datafusion/optimizer",
"datafusion/physical-expr",
"datafusion/physical-expr-common",
@@ -103,6 +104,7 @@ datafusion-functions-aggregate = { path =
"datafusion/functions-aggregate", vers
datafusion-functions-aggregate-common = { path =
"datafusion/functions-aggregate-common", version = "42.0.0" }
datafusion-functions-nested = { path = "datafusion/functions-nested", version
= "42.0.0" }
datafusion-functions-window = { path = "datafusion/functions-window", version
= "42.0.0" }
+datafusion-functions-window-common = { path =
"datafusion/functions-window-common", version = "42.0.0" }
datafusion-optimizer = { path = "datafusion/optimizer", version = "42.0.0",
default-features = false }
datafusion-physical-expr = { path = "datafusion/physical-expr", version =
"42.0.0", default-features = false }
datafusion-physical-expr-common = { path = "datafusion/physical-expr-common",
version = "42.0.0", default-features = false }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 15a2b14ec3..fbe7d5c04b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1343,6 +1343,7 @@ dependencies = [
"datafusion-common",
"datafusion-expr-common",
"datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
"datafusion-physical-expr-common",
"paste",
"serde_json",
@@ -1443,10 +1444,18 @@ version = "42.0.0"
dependencies = [
"datafusion-common",
"datafusion-expr",
+ "datafusion-functions-window-common",
"datafusion-physical-expr-common",
"log",
]
+[[package]]
+name = "datafusion-functions-window-common"
+version = "42.0.0"
+dependencies = [
+ "datafusion-common",
+]
+
[[package]]
name = "datafusion-optimizer"
version = "42.0.0"
@@ -1537,6 +1546,7 @@ dependencies = [
"datafusion-expr",
"datafusion-functions-aggregate",
"datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
"datafusion-physical-expr",
"datafusion-physical-expr-common",
"futures",
diff --git a/datafusion-examples/examples/advanced_udwf.rs
b/datafusion-examples/examples/advanced_udwf.rs
index ec0318a561..fd1b84070c 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -22,9 +22,11 @@ use arrow::{
array::{ArrayRef, AsArray, Float64Array},
datatypes::Float64Type,
};
+use arrow_schema::Field;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::ScalarValue;
+use datafusion_expr::function::WindowUDFFieldArgs;
use datafusion_expr::{
PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
};
@@ -70,16 +72,15 @@ impl WindowUDFImpl for SmoothItUdf {
&self.signature
}
- /// What is the type of value that will be returned by this function.
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- Ok(DataType::Float64)
- }
-
/// Create a `PartitionEvaluator` to evaluate this function on a new
/// partition.
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(MyPartitionEvaluator::new()))
}
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::Float64, true))
+ }
}
/// This implements the lowest level evaluation for a window function
diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs
b/datafusion-examples/examples/simplify_udwf_expression.rs
index a17e45dba2..1ff629eef1 100644
--- a/datafusion-examples/examples/simplify_udwf_expression.rs
+++ b/datafusion-examples/examples/simplify_udwf_expression.rs
@@ -17,12 +17,12 @@
use std::any::Any;
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Field};
use datafusion::execution::context::SessionContext;
use datafusion::functions_aggregate::average::avg_udaf;
use datafusion::{error::Result, execution::options::CsvReadOptions};
-use datafusion_expr::function::WindowFunctionSimplification;
+use datafusion_expr::function::{WindowFunctionSimplification,
WindowUDFFieldArgs};
use datafusion_expr::{
expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator,
Signature,
Volatility, WindowUDF, WindowUDFImpl,
@@ -60,10 +60,6 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
&self.signature
}
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- Ok(DataType::Float64)
- }
-
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
todo!()
}
@@ -84,6 +80,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
Some(Box::new(simplify))
}
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::Float64, true))
+ }
}
// create local execution context with `cars.csv` registered as a table named
`cars`
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 82a799f858..01ba90ee5d 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -145,6 +145,7 @@ bigdecimal = { workspace = true }
criterion = { version = "0.5", features = ["async_tokio"] }
csv = "1.1.6"
ctor = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
doc-comment = { workspace = true }
env_logger = { workspace = true }
half = { workspace = true, default-features = true }
diff --git
a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
index 3c607301fc..d96bb23953 100644
--- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
@@ -29,12 +29,13 @@ use std::{
use arrow::array::AsArray;
use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Field};
use datafusion::{assert_batches_eq, prelude::SessionContext};
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{
PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl,
};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
/// A query with a window function evaluated over the entire partition
const UNBOUNDED_WINDOW_QUERY: &str = "SELECT x, y, val, \
@@ -522,7 +523,6 @@ impl OddCounter {
#[derive(Debug, Clone)]
struct SimpleWindowUDF {
signature: Signature,
- return_type: DataType,
test_state: Arc<TestState>,
aliases: Vec<String>,
}
@@ -531,10 +531,8 @@ impl OddCounter {
fn new(test_state: Arc<TestState>) -> Self {
let signature =
Signature::exact(vec![DataType::Float64],
Volatility::Immutable);
- let return_type = DataType::Int64;
Self {
signature,
- return_type,
test_state,
aliases: vec!["odd_counter_alias".to_string()],
}
@@ -554,10 +552,6 @@ impl OddCounter {
&self.signature
}
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>
{
- Ok(self.return_type.clone())
- }
-
fn partition_evaluator(&self) -> Result<Box<dyn
PartitionEvaluator>> {
Ok(Box::new(OddCounter::new(Arc::clone(&self.test_state))))
}
@@ -565,6 +559,10 @@ impl OddCounter {
fn aliases(&self) -> &[String] {
&self.aliases
}
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::Int64, true))
+ }
}
ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index b5d34d9a38..55387fea22 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -46,6 +46,7 @@ chrono = { workspace = true }
datafusion-common = { workspace = true }
datafusion-expr-common = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
paste = "^1.0"
serde_json = { workspace = true }
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 8cb759b881..c141324962 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -40,6 +40,7 @@ use datafusion_common::tree_node::{
use datafusion_common::{
plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use sqlparser::ast::{
display_comma_separated, ExceptSelectItem, ExcludeSelectItem,
IlikeSelectItem,
NullTreatment, RenameSelectItem, ReplaceSelectElement,
@@ -706,6 +707,7 @@ impl WindowFunctionDefinition {
&self,
input_expr_types: &[DataType],
_input_expr_nullable: &[bool],
+ display_name: &str,
) -> Result<DataType> {
match self {
WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
@@ -714,7 +716,9 @@ impl WindowFunctionDefinition {
WindowFunctionDefinition::AggregateUDF(fun) => {
fun.return_type(input_expr_types)
}
- WindowFunctionDefinition::WindowUDF(fun) =>
fun.return_type(input_expr_types),
+ WindowFunctionDefinition::WindowUDF(fun) => fun
+ .field(WindowUDFFieldArgs::new(input_expr_types, display_name))
+ .map(|field| field.data_type().clone()),
}
}
@@ -2536,10 +2540,10 @@ mod test {
#[test]
fn test_first_value_return_type() -> Result<()> {
let fun = find_df_window_func("first_value").unwrap();
- let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+ let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
assert_eq!(DataType::Utf8, observed);
- let observed = fun.return_type(&[DataType::UInt64], &[true])?;
+ let observed = fun.return_type(&[DataType::UInt64], &[true], "")?;
assert_eq!(DataType::UInt64, observed);
Ok(())
@@ -2548,10 +2552,10 @@ mod test {
#[test]
fn test_last_value_return_type() -> Result<()> {
let fun = find_df_window_func("last_value").unwrap();
- let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+ let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
assert_eq!(DataType::Utf8, observed);
- let observed = fun.return_type(&[DataType::Float64], &[true])?;
+ let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
assert_eq!(DataType::Float64, observed);
Ok(())
@@ -2560,10 +2564,10 @@ mod test {
#[test]
fn test_lead_return_type() -> Result<()> {
let fun = find_df_window_func("lead").unwrap();
- let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+ let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
assert_eq!(DataType::Utf8, observed);
- let observed = fun.return_type(&[DataType::Float64], &[true])?;
+ let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
assert_eq!(DataType::Float64, observed);
Ok(())
@@ -2572,10 +2576,10 @@ mod test {
#[test]
fn test_lag_return_type() -> Result<()> {
let fun = find_df_window_func("lag").unwrap();
- let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+ let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
assert_eq!(DataType::Utf8, observed);
- let observed = fun.return_type(&[DataType::Float64], &[true])?;
+ let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
assert_eq!(DataType::Float64, observed);
Ok(())
@@ -2585,11 +2589,11 @@ mod test {
fn test_nth_value_return_type() -> Result<()> {
let fun = find_df_window_func("nth_value").unwrap();
let observed =
- fun.return_type(&[DataType::Utf8, DataType::UInt64], &[true,
true])?;
+ fun.return_type(&[DataType::Utf8, DataType::UInt64], &[true,
true], "")?;
assert_eq!(DataType::Utf8, observed);
let observed =
- fun.return_type(&[DataType::Float64, DataType::UInt64], &[true,
true])?;
+ fun.return_type(&[DataType::Float64, DataType::UInt64], &[true,
true], "")?;
assert_eq!(DataType::Float64, observed);
Ok(())
@@ -2598,7 +2602,7 @@ mod test {
#[test]
fn test_percent_rank_return_type() -> Result<()> {
let fun = find_df_window_func("percent_rank").unwrap();
- let observed = fun.return_type(&[], &[])?;
+ let observed = fun.return_type(&[], &[], "")?;
assert_eq!(DataType::Float64, observed);
Ok(())
@@ -2607,7 +2611,7 @@ mod test {
#[test]
fn test_cume_dist_return_type() -> Result<()> {
let fun = find_df_window_func("cume_dist").unwrap();
- let observed = fun.return_type(&[], &[])?;
+ let observed = fun.return_type(&[], &[], "")?;
assert_eq!(DataType::Float64, observed);
Ok(())
@@ -2616,7 +2620,7 @@ mod test {
#[test]
fn test_ntile_return_type() -> Result<()> {
let fun = find_df_window_func("ntile").unwrap();
- let observed = fun.return_type(&[DataType::Int16], &[true])?;
+ let observed = fun.return_type(&[DataType::Int16], &[true], "")?;
assert_eq!(DataType::UInt64, observed);
Ok(())
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 5fd3177bc2..2975e36488 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -38,6 +38,7 @@ use arrow::compute::kernels::cast_utils::{
};
use arrow::datatypes::{DataType, Field};
use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use sqlparser::ast::NullTreatment;
use std::any::Any;
use std::fmt::Debug;
@@ -657,13 +658,17 @@ impl WindowUDFImpl for SimpleWindowUDF {
&self.signature
}
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- Ok(self.return_type.clone())
- }
-
fn partition_evaluator(&self) -> Result<Box<dyn
crate::PartitionEvaluator>> {
(self.partition_evaluator_factory)()
}
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(
+ field_args.name(),
+ self.return_type.clone(),
+ true,
+ ))
+ }
}
pub fn interval_year_month_lit(value: &str) -> Expr {
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 598d172d30..f40ac409dd 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -31,6 +31,7 @@ use datafusion_common::{
not_impl_err, plan_datafusion_err, plan_err, Column, ExprSchema, Result,
TableReference,
};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use std::collections::HashMap;
use std::sync::Arc;
@@ -166,49 +167,9 @@ impl ExprSchemable for Expr {
// expressiveness of `TypeSignature`), then infer return type
Ok(func.return_type_from_exprs(args, schema, &arg_data_types)?)
}
- Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
- let data_types = args
- .iter()
- .map(|e| e.get_type(schema))
- .collect::<Result<Vec<_>>>()?;
- let nullability = args
- .iter()
- .map(|e| e.nullable(schema))
- .collect::<Result<Vec<_>>>()?;
- match fun {
- WindowFunctionDefinition::AggregateUDF(udf) => {
- let new_types =
data_types_with_aggregate_udf(&data_types, udf)
- .map_err(|err| {
- plan_datafusion_err!(
- "{} {}",
- err,
- utils::generate_signature_error_msg(
- fun.name(),
- fun.signature(),
- &data_types
- )
- )
- })?;
- Ok(fun.return_type(&new_types, &nullability)?)
- }
- WindowFunctionDefinition::WindowUDF(udwf) => {
- let new_types =
data_types_with_window_udf(&data_types, udwf)
- .map_err(|err| {
- plan_datafusion_err!(
- "{} {}",
- err,
- utils::generate_signature_error_msg(
- fun.name(),
- fun.signature(),
- &data_types
- )
- )
- })?;
- Ok(fun.return_type(&new_types, &nullability)?)
- }
- _ => fun.return_type(&data_types, &nullability),
- }
- }
+ Expr::WindowFunction(window_function) => self
+ .data_type_and_nullable_with_window_function(schema,
window_function)
+ .map(|(return_type, _)| return_type),
Expr::AggregateFunction(AggregateFunction { func, args, .. }) => {
let data_types = args
.iter()
@@ -340,20 +301,12 @@ impl ExprSchemable for Expr {
Expr::AggregateFunction(AggregateFunction { func, .. }) => {
Ok(func.is_nullable())
}
- Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
- WindowFunctionDefinition::BuiltInWindowFunction(func) => {
- if func.name() == "RANK"
- || func.name() == "NTILE"
- || func.name() == "CUME_DIST"
- {
- Ok(false)
- } else {
- Ok(true)
- }
- }
- WindowFunctionDefinition::AggregateUDF(func) =>
Ok(func.is_nullable()),
- WindowFunctionDefinition::WindowUDF(udwf) =>
Ok(udwf.nullable()),
- },
+ Expr::WindowFunction(window_function) => self
+ .data_type_and_nullable_with_window_function(
+ input_schema,
+ window_function,
+ )
+ .map(|(_, nullable)| nullable),
Expr::ScalarVariable(_, _)
| Expr::TryCast { .. }
| Expr::Unnest(_)
@@ -450,6 +403,9 @@ impl ExprSchemable for Expr {
let right = right.data_type_and_nullable(schema)?;
Ok((get_result_type(&left.0, op, &right.0)?, left.1 ||
right.1))
}
+ Expr::WindowFunction(window_function) => {
+ self.data_type_and_nullable_with_window_function(schema,
window_function)
+ }
_ => Ok((self.get_type(schema)?, self.nullable(schema)?)),
}
}
@@ -499,6 +455,76 @@ impl ExprSchemable for Expr {
}
}
+impl Expr {
+ /// Common method for window functions that applies type coercion
+ /// to all arguments of the window function to check if it matches
+ /// its signature.
+ ///
+ /// If successful, this method returns the data type and
+ /// nullability of the window function's result.
+ ///
+ /// Otherwise, returns an error if there's a type mismatch between
+ /// the window function's signature and the provided arguments.
+ fn data_type_and_nullable_with_window_function(
+ &self,
+ schema: &dyn ExprSchema,
+ window_function: &WindowFunction,
+ ) -> Result<(DataType, bool)> {
+ let WindowFunction { fun, args, .. } = window_function;
+
+ let data_types = args
+ .iter()
+ .map(|e| e.get_type(schema))
+ .collect::<Result<Vec<_>>>()?;
+ match fun {
+ WindowFunctionDefinition::BuiltInWindowFunction(window_fun) => {
+ let return_type = window_fun.return_type(&data_types)?;
+ let nullable =
+ !["RANK", "NTILE",
"CUME_DIST"].contains(&window_fun.name());
+ Ok((return_type, nullable))
+ }
+ WindowFunctionDefinition::AggregateUDF(udaf) => {
+ let new_types = data_types_with_aggregate_udf(&data_types,
udaf)
+ .map_err(|err| {
+ plan_datafusion_err!(
+ "{} {}",
+ err,
+ utils::generate_signature_error_msg(
+ fun.name(),
+ fun.signature(),
+ &data_types
+ )
+ )
+ })?;
+
+ let return_type = udaf.return_type(&new_types)?;
+ let nullable = udaf.is_nullable();
+
+ Ok((return_type, nullable))
+ }
+ WindowFunctionDefinition::WindowUDF(udwf) => {
+ let new_types =
+ data_types_with_window_udf(&data_types,
udwf).map_err(|err| {
+ plan_datafusion_err!(
+ "{} {}",
+ err,
+ utils::generate_signature_error_msg(
+ fun.name(),
+ fun.signature(),
+ &data_types
+ )
+ )
+ })?;
+ let (_, function_name) = self.qualified_name();
+ let field_args = WindowUDFFieldArgs::new(&new_types,
&function_name);
+
+ udwf.field(field_args)
+ .map(|field| (field.data_type().clone(),
field.is_nullable()))
+ }
+ }
+ }
+}
+
/// cast subquery in InSubquery/ScalarSubquery to a given type.
pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) ->
Result<Subquery> {
if subquery.subquery.schema().field(0).data_type() == cast_to_type {
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index cd7a0c8aa9..9814d16ddf 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -27,6 +27,8 @@ pub use datafusion_functions_aggregate_common::accumulator::{
AccumulatorArgs, AccumulatorFactoryFunction, StateFieldsArgs,
};
+pub use datafusion_functions_window_common::field::WindowUDFFieldArgs;
+
#[derive(Debug, Clone, Copy)]
pub enum Hint {
/// Indicates the argument needs to be padded if it is scalar
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index fc4432ffdf..7cc57523a1 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -18,7 +18,6 @@
//! [`WindowUDF`]: User Defined Window Functions
use arrow::compute::SortOptions;
-use arrow::datatypes::DataType;
use std::cmp::Ordering;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::{
@@ -27,7 +26,10 @@ use std::{
sync::Arc,
};
+use arrow::datatypes::{DataType, Field};
+
use datafusion_common::{not_impl_err, Result};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use crate::expr::WindowFunction;
use crate::{
@@ -139,13 +141,6 @@ impl WindowUDF {
self.inner.signature()
}
- /// Return the type of the function given its input types
- ///
- /// See [`WindowUDFImpl::return_type`] for more details.
- pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
- self.inner.return_type(args)
- }
-
/// Do the function rewrite
///
/// See [`WindowUDFImpl::simplify`] for more details.
@@ -158,11 +153,11 @@ impl WindowUDF {
self.inner.partition_evaluator()
}
- /// Returns if column values are nullable for this window function.
+ /// Returns the field of the final result of evaluating this window
function.
///
- /// See [`WindowUDFImpl::nullable`] for more details.
- pub fn nullable(&self) -> bool {
- self.inner.nullable()
+ /// See [`WindowUDFImpl::field`] for more details.
+ pub fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ self.inner.field(field_args)
}
/// Returns custom result ordering introduced by this window function
@@ -201,10 +196,11 @@ where
/// # Basic Example
/// ```
/// # use std::any::Any;
-/// # use arrow::datatypes::DataType;
+/// # use arrow::datatypes::{DataType, Field};
/// # use datafusion_common::{DataFusionError, plan_err, Result};
/// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator,
WindowFrame, ExprFunctionExt};
/// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
+/// use datafusion_functions_window_common::field::WindowUDFFieldArgs;
/// #[derive(Debug, Clone)]
/// struct SmoothIt {
/// signature: Signature
@@ -223,14 +219,15 @@ where
/// fn as_any(&self) -> &dyn Any { self }
/// fn name(&self) -> &str { "smooth_it" }
/// fn signature(&self) -> &Signature { &self.signature }
-/// fn return_type(&self, args: &[DataType]) -> Result<DataType> {
-/// if !matches!(args.get(0), Some(&DataType::Int32)) {
-/// return plan_err!("smooth_it only accepts Int32 arguments");
-/// }
-/// Ok(DataType::Int32)
-/// }
/// // The actual implementation would add one to the argument
/// fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!() }
+/// fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+/// if let Some(DataType::Int32) = field_args.get_input_type(0) {
+/// Ok(Field::new(field_args.name(), DataType::Int32, false))
+/// } else {
+/// plan_err!("smooth_it only accepts Int32 arguments")
+/// }
+/// }
/// }
///
/// // Create a new WindowUDF from the implementation
@@ -259,10 +256,6 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
/// types are accepted and the function's Volatility.
fn signature(&self) -> &Signature;
- /// What [`DataType`] will be returned by this function, given the types of
- /// the arguments
- fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
-
/// Invoke the function, returning the [`PartitionEvaluator`] instance
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
@@ -324,14 +317,8 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
hasher.finish()
}
- /// Allows customizing nullable of column for this window UDF.
- ///
- /// By default, the final result of evaluating the window UDF is
- /// allowed to have null values. But if that is not the case then
- /// it can be customized in the window UDF implementation.
- fn nullable(&self) -> bool {
- true
- }
+ /// The [`Field`] of the final result of evaluating this window function.
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field>;
/// Allows the window UDF to define a custom result ordering.
///
@@ -414,10 +401,6 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
self.inner.signature()
}
- fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- self.inner.return_type(arg_types)
- }
-
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
self.inner.partition_evaluator()
}
@@ -445,8 +428,8 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
hasher.finish()
}
- fn nullable(&self) -> bool {
- self.inner.nullable()
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ self.inner.field(field_args)
}
fn sort_options(&self) -> Option<SortOptions> {
@@ -461,9 +444,10 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
#[cfg(test)]
mod test {
use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl};
- use arrow::datatypes::DataType;
+ use arrow::datatypes::{DataType, Field};
use datafusion_common::Result;
use datafusion_expr_common::signature::{Signature, Volatility};
+ use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use std::any::Any;
use std::cmp::Ordering;
@@ -495,10 +479,10 @@ mod test {
fn signature(&self) -> &Signature {
&self.signature
}
- fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!()
}
- fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<Field> {
unimplemented!()
}
}
@@ -531,10 +515,10 @@ mod test {
fn signature(&self) -> &Signature {
&self.signature
}
- fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+ fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!()
}
- fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+ fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<Field> {
unimplemented!()
}
}
diff --git a/datafusion/functions-window/Cargo.toml
b/datafusion/functions-window-common/Cargo.toml
similarity index 80%
copy from datafusion/functions-window/Cargo.toml
copy to datafusion/functions-window-common/Cargo.toml
index 94dd421284..98b6f8c6db 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window-common/Cargo.toml
@@ -16,32 +16,26 @@
# under the License.
[package]
-name = "datafusion-functions-window"
-description = "Window function packages for the DataFusion query engine"
+name = "datafusion-functions-window-common"
+description = "Common functions for implementing user-defined window functions
for the DataFusion query engine"
keywords = ["datafusion", "logical", "plan", "expressions"]
readme = "README.md"
-version = { workspace = true }
+authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
-repository = { workspace = true }
license = { workspace = true }
-authors = { workspace = true }
+repository = { workspace = true }
rust-version = { workspace = true }
+version = { workspace = true }
[lints]
workspace = true
[lib]
-name = "datafusion_functions_window"
+name = "datafusion_functions_window_common"
path = "src/lib.rs"
# See more keys and their definitions at
https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
datafusion-common = { workspace = true }
-datafusion-expr = { workspace = true }
-datafusion-physical-expr-common = { workspace = true }
-log = { workspace = true }
-
-[dev-dependencies]
-arrow = { workspace = true }
diff --git a/datafusion/functions-window-common/README.md
b/datafusion/functions-window-common/README.md
new file mode 100644
index 0000000000..de12d25f97
--- /dev/null
+++ b/datafusion/functions-window-common/README.md
@@ -0,0 +1,26 @@
+<!---
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+
+# DataFusion Window Function Common Library
+
+[DataFusion][df] is an extensible query execution framework, written in Rust,
that uses Apache Arrow as its in-memory format.
+
+This crate contains common functions for implementing user-defined window
functions.
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/functions-window-common/src/field.rs
b/datafusion/functions-window-common/src/field.rs
new file mode 100644
index 0000000000..8011b7b0f0
--- /dev/null
+++ b/datafusion/functions-window-common/src/field.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::arrow::datatypes::DataType;
+
+/// Metadata for defining the result field from evaluating a
+/// user-defined window function.
+pub struct WindowUDFFieldArgs<'a> {
+ /// The data types corresponding to the arguments to the
+ /// user-defined window function.
+ input_types: &'a [DataType],
+ /// The display name of the user-defined window function.
+ display_name: &'a str,
+}
+
+impl<'a> WindowUDFFieldArgs<'a> {
+ /// Create an instance of [`WindowUDFFieldArgs`].
+ ///
+ /// # Arguments
+ ///
+ /// * `input_types` - The data types corresponding to the
+ /// arguments to the user-defined window function.
+ /// * `function_name` - The qualified schema name of the
+ /// user-defined window function expression.
+ ///
+ pub fn new(input_types: &'a [DataType], display_name: &'a str) -> Self {
+ WindowUDFFieldArgs {
+ input_types,
+ display_name,
+ }
+ }
+
+ /// Returns the data type of input expressions passed as arguments
+ /// to the user-defined window function.
+ pub fn input_types(&self) -> &[DataType] {
+ self.input_types
+ }
+
+ /// Returns the name for the field of the final result of evaluating
+ /// the user-defined window function.
+ pub fn name(&self) -> &str {
+ self.display_name
+ }
+
+ /// Returns `Some(DataType)` of input expression at index, otherwise
+ /// returns `None` if the index is out of bounds.
+ pub fn get_input_type(&self, index: usize) -> Option<DataType> {
+ self.input_types.get(index).cloned()
+ }
+}
diff --git a/datafusion/functions-window-common/src/lib.rs
b/datafusion/functions-window-common/src/lib.rs
new file mode 100644
index 0000000000..2e4bcbbc83
--- /dev/null
+++ b/datafusion/functions-window-common/src/lib.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common user-defined window functionality for [DataFusion]
+//!
+//! [DataFusion]: <https://crates.io/crates/datafusion>
+pub mod field;
diff --git a/datafusion/functions-window/Cargo.toml
b/datafusion/functions-window/Cargo.toml
index 94dd421284..8dcec6bc96 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window/Cargo.toml
@@ -40,6 +40,7 @@ path = "src/lib.rs"
[dependencies]
datafusion-common = { workspace = true }
datafusion-expr = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
log = { workspace = true }
diff --git a/datafusion/functions-window/src/row_number.rs
b/datafusion/functions-window/src/row_number.rs
index 43d2796ad7..7f348bf9d2 100644
--- a/datafusion/functions-window/src/row_number.rs
+++ b/datafusion/functions-window/src/row_number.rs
@@ -25,9 +25,12 @@ use datafusion_common::arrow::array::ArrayRef;
use datafusion_common::arrow::array::UInt64Array;
use datafusion_common::arrow::compute::SortOptions;
use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
use datafusion_common::{Result, ScalarValue};
use datafusion_expr::expr::WindowFunction;
use datafusion_expr::{Expr, PartitionEvaluator, Signature, Volatility,
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use field::WindowUDFFieldArgs;
/// Create a [`WindowFunction`](Expr::WindowFunction) expression for
/// `row_number` user-defined window function.
@@ -84,16 +87,12 @@ impl WindowUDFImpl for RowNumber {
&self.signature
}
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- Ok(DataType::UInt64)
- }
-
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::<NumRowsEvaluator>::default())
}
- fn nullable(&self) -> bool {
- false
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ Ok(Field::new(field_args.name(), DataType::UInt64, false))
}
fn sort_options(&self) -> Option<SortOptions> {
diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml
index 1a9e9630c0..337a24ffae 100644
--- a/datafusion/optimizer/Cargo.toml
+++ b/datafusion/optimizer/Cargo.toml
@@ -57,5 +57,6 @@ regex-syntax = "0.8.0"
arrow-buffer = { workspace = true }
ctor = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
datafusion-sql = { workspace = true }
env_logger = { workspace = true }
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index fc3921d296..a78a54a571 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -1798,6 +1798,7 @@ mod tests {
interval_arithmetic::Interval,
*,
};
+ use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use std::{
collections::HashMap,
ops::{BitAnd, BitOr, BitXor},
@@ -3901,10 +3902,6 @@ mod tests {
unimplemented!()
}
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- unimplemented!("not needed for tests")
- }
-
fn simplify(&self) -> Option<WindowFunctionSimplification> {
if self.simplify {
Some(Box::new(|_, _| Ok(col("result_column"))))
@@ -3916,5 +3913,9 @@ mod tests {
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
unimplemented!("not needed for tests")
}
+
+ fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<Field> {
+ unimplemented!("not needed for tests")
+ }
}
}
diff --git a/datafusion/physical-plan/Cargo.toml
b/datafusion/physical-plan/Cargo.toml
index 24387c5f15..c3f1b7eb0e 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -53,6 +53,7 @@ datafusion-execution = { workspace = true }
datafusion-expr = { workspace = true }
datafusion-functions-aggregate = { workspace = true }
datafusion-functions-aggregate-common = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
datafusion-physical-expr = { workspace = true, default-features = true }
datafusion-physical-expr-common = { workspace = true }
futures = { workspace = true }
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index 0275cd2441..981a8e2851 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -51,6 +51,7 @@ mod utils;
mod window_agg_exec;
pub use bounded_window_agg_exec::BoundedWindowAggExec;
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_physical_expr::expressions::Column;
pub use datafusion_physical_expr::window::{
BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
@@ -73,7 +74,8 @@ pub fn schema_add_window_field(
.iter()
.map(|e| Arc::clone(e).as_ref().nullable(schema))
.collect::<Result<Vec<_>>>()?;
- let window_expr_return_type = window_fn.return_type(&data_types,
&nullability)?;
+ let window_expr_return_type =
+ window_fn.return_type(&data_types, &nullability, fn_name)?;
let mut window_fields = schema
.fields()
.iter()
@@ -334,13 +336,11 @@ fn create_udwf_window_expr(
.map(|arg| arg.data_type(input_schema))
.collect::<Result<_>>()?;
- // figure out the output type
- let data_type = fun.return_type(&input_types)?;
Ok(Arc::new(WindowUDFExpr {
fun: Arc::clone(fun),
args: args.to_vec(),
+ input_types,
name,
- data_type,
}))
}
@@ -351,8 +351,8 @@ struct WindowUDFExpr {
args: Vec<Arc<dyn PhysicalExpr>>,
/// Display name
name: String,
- /// result type
- data_type: DataType,
+ /// Types of input expressions
+ input_types: Vec<DataType>,
}
impl BuiltInWindowFunctionExpr for WindowUDFExpr {
@@ -361,11 +361,8 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
}
fn field(&self) -> Result<Field> {
- Ok(Field::new(
- &self.name,
- self.data_type.clone(),
- self.fun.nullable(),
- ))
+ self.fun
+ .field(WindowUDFFieldArgs::new(&self.input_types, &self.name))
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index ce40129fcf..d65c6ccaa6 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -60,6 +60,7 @@ serde_json = { workspace = true, optional = true }
[dev-dependencies]
datafusion-functions = { workspace = true, default-features = true }
datafusion-functions-aggregate = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
doc-comment = { workspace = true }
strum = { version = "0.26.1", features = ["derive"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 133c38ab8c..1f1426164d 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -75,6 +75,7 @@ use datafusion_functions_aggregate::expr_fn::{
};
use datafusion_functions_aggregate::kurtosis_pop::kurtosis_pop;
use datafusion_functions_aggregate::string_agg::string_agg;
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec,
@@ -2430,20 +2431,21 @@ fn roundtrip_window() {
&self.signature
}
- fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
- if arg_types.len() != 1 {
- return plan_err!(
- "dummy_udwf expects 1 argument, got {}: {:?}",
- arg_types.len(),
- arg_types
- );
- }
- Ok(arg_types[0].clone())
- }
-
fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
make_partition_evaluator()
}
+
+ fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+ if let Some(return_type) = field_args.get_input_type(0) {
+ Ok(Field::new(field_args.name(), return_type, true))
+ } else {
+ plan_err!(
+ "dummy_udwf expects 1 argument, got {}: {:?}",
+ field_args.input_types().len(),
+ field_args.input_types()
+ )
+ }
+ }
}
fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]