This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new e1b992a787 Add `field` trait method to `WindowUDFImpl`, remove 
`return_type`/`nullable` (#12374)
e1b992a787 is described below

commit e1b992a7878e78e8a63b7e24425c665727bda493
Author: jcsherin <[email protected]>
AuthorDate: Sat Sep 21 16:53:08 2024 +0530

    Add `field` trait method to `WindowUDFImpl`, remove 
`return_type`/`nullable` (#12374)
    
    * Adds new library `functions-window-common`
    
    * Adds `FieldArgs` struct for field of final result
    
    * Adds `field` method to `WindowUDFImpl` trait
    
    * Minor: fixes formatting
    
    * Fixes: udwf doc test
    
    * Fixes: implements missing trait items
    
    * Updates `datafusion-cli` dependencies
    
    * Fixes: formatting of `Cargo.toml` files
    
    * Fixes: implementation of `field` in udwf example
    
    * Pass `FieldArgs` argument to `field`
    
    * Use `field` in place of `return_type` for udwf
    
    * Update `field` in udwf implementations
    
    * Fixes: implementation of `field` in udwf example
    
    * Revert unrelated change
    
    * Mark `return_type` for udwf as unreachable
    
    * Delete code
    
    * Uses schema name of udwf to construct `FieldArgs`
    
    * Adds deprecated notice to `return_type` trait method
    
    * Add doc comments to `field` trait method
    
    * Reify `input_types` when creating the udwf window expression
    
    * Rename name field to `schema_name` in `FieldArgs`
    
    * Make `FieldArgs` opaque
    
    * Minor refactor
    
    * Removes `nullable` trait method from `WindowUDFImpl`
    
    * Add doc comments
    
    * Rename to `WindowUDFResultArgs`
    
    * Minor: fixes formatting
    
    * Copy edits for doc comments
    
    * Renames field to `function_name`
    
    * Rename struct to `WindowUDFFieldArgs`
    
    * Add comments for unreachable code
    
    * Copy edit for `WindowUDFImpl::field` trait method
    
    * Renames module
    
    * Fix warning: unused doc comment
    
    * Minor: rename bindings
    
    * Minor refactor
    
    * Minor: copy edit
    
    * Fixes: use `Expr::qualified_name` for window function name
    
    * Fixes: apply previous fix to `Expr::nullable`
    
    * Refactor: reuse type coercion for window functions
    
    * Fixes: clippy errors
    
    * Adds name parameter to `WindowFunctionDefinition::return_type`
    
    * Removes `return_type` field from `SimpleWindowUDF`
    
    * Add doc comment for helper method
    
    * Rewrite doc comments
    
    * Minor: remove empty comment
    
    * Remove `WindowUDFImpl::return_type`
    
    * Fixes doc test
---
 Cargo.toml                                         |   2 +
 datafusion-cli/Cargo.lock                          |  10 ++
 datafusion-examples/examples/advanced_udwf.rs      |  11 +-
 .../examples/simplify_udwf_expression.rs           |  12 +-
 datafusion/core/Cargo.toml                         |   1 +
 .../user_defined/user_defined_window_functions.rs  |  14 +--
 datafusion/expr/Cargo.toml                         |   1 +
 datafusion/expr/src/expr.rs                        |  32 ++---
 datafusion/expr/src/expr_fn.rs                     |  13 +-
 datafusion/expr/src/expr_schema.rs                 | 140 ++++++++++++---------
 datafusion/expr/src/function.rs                    |   2 +
 datafusion/expr/src/udwf.rs                        |  68 ++++------
 .../Cargo.toml                                     |  18 +--
 datafusion/functions-window-common/README.md       |  26 ++++
 datafusion/functions-window-common/src/field.rs    |  64 ++++++++++
 datafusion/functions-window-common/src/lib.rs      |  21 ++++
 datafusion/functions-window/Cargo.toml             |   1 +
 datafusion/functions-window/src/row_number.rs      |  11 +-
 datafusion/optimizer/Cargo.toml                    |   1 +
 .../src/simplify_expressions/expr_simplifier.rs    |   9 +-
 datafusion/physical-plan/Cargo.toml                |   1 +
 datafusion/physical-plan/src/windows/mod.rs        |  19 ++-
 datafusion/proto/Cargo.toml                        |   1 +
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  24 ++--
 24 files changed, 322 insertions(+), 180 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index c80297a1f5..e8cd52315a 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -31,6 +31,7 @@ members = [
     "datafusion/functions-aggregate-common",
     "datafusion/functions-nested",
     "datafusion/functions-window",
+    "datafusion/functions-window-common",
     "datafusion/optimizer",
     "datafusion/physical-expr",
     "datafusion/physical-expr-common",
@@ -103,6 +104,7 @@ datafusion-functions-aggregate = { path = 
"datafusion/functions-aggregate", vers
 datafusion-functions-aggregate-common = { path = 
"datafusion/functions-aggregate-common", version = "42.0.0" }
 datafusion-functions-nested = { path = "datafusion/functions-nested", version 
= "42.0.0" }
 datafusion-functions-window = { path = "datafusion/functions-window", version 
= "42.0.0" }
+datafusion-functions-window-common = { path = 
"datafusion/functions-window-common", version = "42.0.0" }
 datafusion-optimizer = { path = "datafusion/optimizer", version = "42.0.0", 
default-features = false }
 datafusion-physical-expr = { path = "datafusion/physical-expr", version = 
"42.0.0", default-features = false }
 datafusion-physical-expr-common = { path = "datafusion/physical-expr-common", 
version = "42.0.0", default-features = false }
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index 15a2b14ec3..fbe7d5c04b 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -1343,6 +1343,7 @@ dependencies = [
  "datafusion-common",
  "datafusion-expr-common",
  "datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr-common",
  "paste",
  "serde_json",
@@ -1443,10 +1444,18 @@ version = "42.0.0"
 dependencies = [
  "datafusion-common",
  "datafusion-expr",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr-common",
  "log",
 ]
 
+[[package]]
+name = "datafusion-functions-window-common"
+version = "42.0.0"
+dependencies = [
+ "datafusion-common",
+]
+
 [[package]]
 name = "datafusion-optimizer"
 version = "42.0.0"
@@ -1537,6 +1546,7 @@ dependencies = [
  "datafusion-expr",
  "datafusion-functions-aggregate",
  "datafusion-functions-aggregate-common",
+ "datafusion-functions-window-common",
  "datafusion-physical-expr",
  "datafusion-physical-expr-common",
  "futures",
diff --git a/datafusion-examples/examples/advanced_udwf.rs 
b/datafusion-examples/examples/advanced_udwf.rs
index ec0318a561..fd1b84070c 100644
--- a/datafusion-examples/examples/advanced_udwf.rs
+++ b/datafusion-examples/examples/advanced_udwf.rs
@@ -22,9 +22,11 @@ use arrow::{
     array::{ArrayRef, AsArray, Float64Array},
     datatypes::Float64Type,
 };
+use arrow_schema::Field;
 use datafusion::error::Result;
 use datafusion::prelude::*;
 use datafusion_common::ScalarValue;
+use datafusion_expr::function::WindowUDFFieldArgs;
 use datafusion_expr::{
     PartitionEvaluator, Signature, WindowFrame, WindowUDF, WindowUDFImpl,
 };
@@ -70,16 +72,15 @@ impl WindowUDFImpl for SmoothItUdf {
         &self.signature
     }
 
-    /// What is the type of value that will be returned by this function.
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        Ok(DataType::Float64)
-    }
-
     /// Create a `PartitionEvaluator` to evaluate this function on a new
     /// partition.
     fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::new(MyPartitionEvaluator::new()))
     }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::Float64, true))
+    }
 }
 
 /// This implements the lowest level evaluation for a window function
diff --git a/datafusion-examples/examples/simplify_udwf_expression.rs 
b/datafusion-examples/examples/simplify_udwf_expression.rs
index a17e45dba2..1ff629eef1 100644
--- a/datafusion-examples/examples/simplify_udwf_expression.rs
+++ b/datafusion-examples/examples/simplify_udwf_expression.rs
@@ -17,12 +17,12 @@
 
 use std::any::Any;
 
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Field};
 
 use datafusion::execution::context::SessionContext;
 use datafusion::functions_aggregate::average::avg_udaf;
 use datafusion::{error::Result, execution::options::CsvReadOptions};
-use datafusion_expr::function::WindowFunctionSimplification;
+use datafusion_expr::function::{WindowFunctionSimplification, 
WindowUDFFieldArgs};
 use datafusion_expr::{
     expr::WindowFunction, simplify::SimplifyInfo, Expr, PartitionEvaluator, 
Signature,
     Volatility, WindowUDF, WindowUDFImpl,
@@ -60,10 +60,6 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
         &self.signature
     }
 
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        Ok(DataType::Float64)
-    }
-
     fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         todo!()
     }
@@ -84,6 +80,10 @@ impl WindowUDFImpl for SimplifySmoothItUdf {
 
         Some(Box::new(simplify))
     }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::Float64, true))
+    }
 }
 
 // create local execution context with `cars.csv` registered as a table named 
`cars`
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 82a799f858..01ba90ee5d 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -145,6 +145,7 @@ bigdecimal = { workspace = true }
 criterion = { version = "0.5", features = ["async_tokio"] }
 csv = "1.1.6"
 ctor = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
 doc-comment = { workspace = true }
 env_logger = { workspace = true }
 half = { workspace = true, default-features = true }
diff --git 
a/datafusion/core/tests/user_defined/user_defined_window_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
index 3c607301fc..d96bb23953 100644
--- a/datafusion/core/tests/user_defined/user_defined_window_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_window_functions.rs
@@ -29,12 +29,13 @@ use std::{
 
 use arrow::array::AsArray;
 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringArray};
-use arrow_schema::DataType;
+use arrow_schema::{DataType, Field};
 use datafusion::{assert_batches_eq, prelude::SessionContext};
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::{
     PartitionEvaluator, Signature, Volatility, WindowUDF, WindowUDFImpl,
 };
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 
 /// A query with a window function evaluated over the entire partition
 const UNBOUNDED_WINDOW_QUERY: &str = "SELECT x, y, val, \
@@ -522,7 +523,6 @@ impl OddCounter {
         #[derive(Debug, Clone)]
         struct SimpleWindowUDF {
             signature: Signature,
-            return_type: DataType,
             test_state: Arc<TestState>,
             aliases: Vec<String>,
         }
@@ -531,10 +531,8 @@ impl OddCounter {
             fn new(test_state: Arc<TestState>) -> Self {
                 let signature =
                     Signature::exact(vec![DataType::Float64], 
Volatility::Immutable);
-                let return_type = DataType::Int64;
                 Self {
                     signature,
-                    return_type,
                     test_state,
                     aliases: vec!["odd_counter_alias".to_string()],
                 }
@@ -554,10 +552,6 @@ impl OddCounter {
                 &self.signature
             }
 
-            fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> 
{
-                Ok(self.return_type.clone())
-            }
-
             fn partition_evaluator(&self) -> Result<Box<dyn 
PartitionEvaluator>> {
                 Ok(Box::new(OddCounter::new(Arc::clone(&self.test_state))))
             }
@@ -565,6 +559,10 @@ impl OddCounter {
             fn aliases(&self) -> &[String] {
                 &self.aliases
             }
+
+            fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+                Ok(Field::new(field_args.name(), DataType::Int64, true))
+            }
         }
 
         ctx.register_udwf(WindowUDF::from(SimpleWindowUDF::new(test_state)))
diff --git a/datafusion/expr/Cargo.toml b/datafusion/expr/Cargo.toml
index b5d34d9a38..55387fea22 100644
--- a/datafusion/expr/Cargo.toml
+++ b/datafusion/expr/Cargo.toml
@@ -46,6 +46,7 @@ chrono = { workspace = true }
 datafusion-common = { workspace = true }
 datafusion-expr-common = { workspace = true }
 datafusion-functions-aggregate-common = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 paste = "^1.0"
 serde_json = { workspace = true }
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 8cb759b881..c141324962 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -40,6 +40,7 @@ use datafusion_common::tree_node::{
 use datafusion_common::{
     plan_err, Column, DFSchema, Result, ScalarValue, TableReference,
 };
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use sqlparser::ast::{
     display_comma_separated, ExceptSelectItem, ExcludeSelectItem, 
IlikeSelectItem,
     NullTreatment, RenameSelectItem, ReplaceSelectElement,
@@ -706,6 +707,7 @@ impl WindowFunctionDefinition {
         &self,
         input_expr_types: &[DataType],
         _input_expr_nullable: &[bool],
+        display_name: &str,
     ) -> Result<DataType> {
         match self {
             WindowFunctionDefinition::BuiltInWindowFunction(fun) => {
@@ -714,7 +716,9 @@ impl WindowFunctionDefinition {
             WindowFunctionDefinition::AggregateUDF(fun) => {
                 fun.return_type(input_expr_types)
             }
-            WindowFunctionDefinition::WindowUDF(fun) => 
fun.return_type(input_expr_types),
+            WindowFunctionDefinition::WindowUDF(fun) => fun
+                .field(WindowUDFFieldArgs::new(input_expr_types, display_name))
+                .map(|field| field.data_type().clone()),
         }
     }
 
@@ -2536,10 +2540,10 @@ mod test {
     #[test]
     fn test_first_value_return_type() -> Result<()> {
         let fun = find_df_window_func("first_value").unwrap();
-        let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+        let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
         assert_eq!(DataType::Utf8, observed);
 
-        let observed = fun.return_type(&[DataType::UInt64], &[true])?;
+        let observed = fun.return_type(&[DataType::UInt64], &[true], "")?;
         assert_eq!(DataType::UInt64, observed);
 
         Ok(())
@@ -2548,10 +2552,10 @@ mod test {
     #[test]
     fn test_last_value_return_type() -> Result<()> {
         let fun = find_df_window_func("last_value").unwrap();
-        let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+        let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
         assert_eq!(DataType::Utf8, observed);
 
-        let observed = fun.return_type(&[DataType::Float64], &[true])?;
+        let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
         assert_eq!(DataType::Float64, observed);
 
         Ok(())
@@ -2560,10 +2564,10 @@ mod test {
     #[test]
     fn test_lead_return_type() -> Result<()> {
         let fun = find_df_window_func("lead").unwrap();
-        let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+        let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
         assert_eq!(DataType::Utf8, observed);
 
-        let observed = fun.return_type(&[DataType::Float64], &[true])?;
+        let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
         assert_eq!(DataType::Float64, observed);
 
         Ok(())
@@ -2572,10 +2576,10 @@ mod test {
     #[test]
     fn test_lag_return_type() -> Result<()> {
         let fun = find_df_window_func("lag").unwrap();
-        let observed = fun.return_type(&[DataType::Utf8], &[true])?;
+        let observed = fun.return_type(&[DataType::Utf8], &[true], "")?;
         assert_eq!(DataType::Utf8, observed);
 
-        let observed = fun.return_type(&[DataType::Float64], &[true])?;
+        let observed = fun.return_type(&[DataType::Float64], &[true], "")?;
         assert_eq!(DataType::Float64, observed);
 
         Ok(())
@@ -2585,11 +2589,11 @@ mod test {
     fn test_nth_value_return_type() -> Result<()> {
         let fun = find_df_window_func("nth_value").unwrap();
         let observed =
-            fun.return_type(&[DataType::Utf8, DataType::UInt64], &[true, 
true])?;
+            fun.return_type(&[DataType::Utf8, DataType::UInt64], &[true, 
true], "")?;
         assert_eq!(DataType::Utf8, observed);
 
         let observed =
-            fun.return_type(&[DataType::Float64, DataType::UInt64], &[true, 
true])?;
+            fun.return_type(&[DataType::Float64, DataType::UInt64], &[true, 
true], "")?;
         assert_eq!(DataType::Float64, observed);
 
         Ok(())
@@ -2598,7 +2602,7 @@ mod test {
     #[test]
     fn test_percent_rank_return_type() -> Result<()> {
         let fun = find_df_window_func("percent_rank").unwrap();
-        let observed = fun.return_type(&[], &[])?;
+        let observed = fun.return_type(&[], &[], "")?;
         assert_eq!(DataType::Float64, observed);
 
         Ok(())
@@ -2607,7 +2611,7 @@ mod test {
     #[test]
     fn test_cume_dist_return_type() -> Result<()> {
         let fun = find_df_window_func("cume_dist").unwrap();
-        let observed = fun.return_type(&[], &[])?;
+        let observed = fun.return_type(&[], &[], "")?;
         assert_eq!(DataType::Float64, observed);
 
         Ok(())
@@ -2616,7 +2620,7 @@ mod test {
     #[test]
     fn test_ntile_return_type() -> Result<()> {
         let fun = find_df_window_func("ntile").unwrap();
-        let observed = fun.return_type(&[DataType::Int16], &[true])?;
+        let observed = fun.return_type(&[DataType::Int16], &[true], "")?;
         assert_eq!(DataType::UInt64, observed);
 
         Ok(())
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index 5fd3177bc2..2975e36488 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -38,6 +38,7 @@ use arrow::compute::kernels::cast_utils::{
 };
 use arrow::datatypes::{DataType, Field};
 use datafusion_common::{plan_err, Column, Result, ScalarValue, TableReference};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use sqlparser::ast::NullTreatment;
 use std::any::Any;
 use std::fmt::Debug;
@@ -657,13 +658,17 @@ impl WindowUDFImpl for SimpleWindowUDF {
         &self.signature
     }
 
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        Ok(self.return_type.clone())
-    }
-
     fn partition_evaluator(&self) -> Result<Box<dyn 
crate::PartitionEvaluator>> {
         (self.partition_evaluator_factory)()
     }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(
+            field_args.name(),
+            self.return_type.clone(),
+            true,
+        ))
+    }
 }
 
 pub fn interval_year_month_lit(value: &str) -> Expr {
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 598d172d30..f40ac409dd 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -31,6 +31,7 @@ use datafusion_common::{
     not_impl_err, plan_datafusion_err, plan_err, Column, ExprSchema, Result,
     TableReference,
 };
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use std::collections::HashMap;
 use std::sync::Arc;
 
@@ -166,49 +167,9 @@ impl ExprSchemable for Expr {
                 // expressiveness of `TypeSignature`), then infer return type
                 Ok(func.return_type_from_exprs(args, schema, &arg_data_types)?)
             }
-            Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
-                let data_types = args
-                    .iter()
-                    .map(|e| e.get_type(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                let nullability = args
-                    .iter()
-                    .map(|e| e.nullable(schema))
-                    .collect::<Result<Vec<_>>>()?;
-                match fun {
-                    WindowFunctionDefinition::AggregateUDF(udf) => {
-                        let new_types = 
data_types_with_aggregate_udf(&data_types, udf)
-                            .map_err(|err| {
-                            plan_datafusion_err!(
-                                "{} {}",
-                                err,
-                                utils::generate_signature_error_msg(
-                                    fun.name(),
-                                    fun.signature(),
-                                    &data_types
-                                )
-                            )
-                        })?;
-                        Ok(fun.return_type(&new_types, &nullability)?)
-                    }
-                    WindowFunctionDefinition::WindowUDF(udwf) => {
-                        let new_types = 
data_types_with_window_udf(&data_types, udwf)
-                            .map_err(|err| {
-                                plan_datafusion_err!(
-                                    "{} {}",
-                                    err,
-                                    utils::generate_signature_error_msg(
-                                        fun.name(),
-                                        fun.signature(),
-                                        &data_types
-                                    )
-                                )
-                            })?;
-                        Ok(fun.return_type(&new_types, &nullability)?)
-                    }
-                    _ => fun.return_type(&data_types, &nullability),
-                }
-            }
+            Expr::WindowFunction(window_function) => self
+                .data_type_and_nullable_with_window_function(schema, 
window_function)
+                .map(|(return_type, _)| return_type),
             Expr::AggregateFunction(AggregateFunction { func, args, .. }) => {
                 let data_types = args
                     .iter()
@@ -340,20 +301,12 @@ impl ExprSchemable for Expr {
             Expr::AggregateFunction(AggregateFunction { func, .. }) => {
                 Ok(func.is_nullable())
             }
-            Expr::WindowFunction(WindowFunction { fun, .. }) => match fun {
-                WindowFunctionDefinition::BuiltInWindowFunction(func) => {
-                    if func.name() == "RANK"
-                        || func.name() == "NTILE"
-                        || func.name() == "CUME_DIST"
-                    {
-                        Ok(false)
-                    } else {
-                        Ok(true)
-                    }
-                }
-                WindowFunctionDefinition::AggregateUDF(func) => 
Ok(func.is_nullable()),
-                WindowFunctionDefinition::WindowUDF(udwf) => 
Ok(udwf.nullable()),
-            },
+            Expr::WindowFunction(window_function) => self
+                .data_type_and_nullable_with_window_function(
+                    input_schema,
+                    window_function,
+                )
+                .map(|(_, nullable)| nullable),
             Expr::ScalarVariable(_, _)
             | Expr::TryCast { .. }
             | Expr::Unnest(_)
@@ -450,6 +403,9 @@ impl ExprSchemable for Expr {
                 let right = right.data_type_and_nullable(schema)?;
                 Ok((get_result_type(&left.0, op, &right.0)?, left.1 || 
right.1))
             }
+            Expr::WindowFunction(window_function) => {
+                self.data_type_and_nullable_with_window_function(schema, 
window_function)
+            }
             _ => Ok((self.get_type(schema)?, self.nullable(schema)?)),
         }
     }
@@ -499,6 +455,76 @@ impl ExprSchemable for Expr {
     }
 }
 
+impl Expr {
+    /// Common method for window functions that applies type coercion
+    /// to all arguments of the window function to check if it matches
+    /// its signature.
+    ///
+    /// If successful, this method returns the data type and
+    /// nullability of the window function's result.
+    ///
+    /// Otherwise, returns an error if there's a type mismatch between
+    /// the window function's signature and the provided arguments.
+    fn data_type_and_nullable_with_window_function(
+        &self,
+        schema: &dyn ExprSchema,
+        window_function: &WindowFunction,
+    ) -> Result<(DataType, bool)> {
+        let WindowFunction { fun, args, .. } = window_function;
+
+        let data_types = args
+            .iter()
+            .map(|e| e.get_type(schema))
+            .collect::<Result<Vec<_>>>()?;
+        match fun {
+            WindowFunctionDefinition::BuiltInWindowFunction(window_fun) => {
+                let return_type = window_fun.return_type(&data_types)?;
+                let nullable =
+                    !["RANK", "NTILE", 
"CUME_DIST"].contains(&window_fun.name());
+                Ok((return_type, nullable))
+            }
+            WindowFunctionDefinition::AggregateUDF(udaf) => {
+                let new_types = data_types_with_aggregate_udf(&data_types, 
udaf)
+                    .map_err(|err| {
+                        plan_datafusion_err!(
+                            "{} {}",
+                            err,
+                            utils::generate_signature_error_msg(
+                                fun.name(),
+                                fun.signature(),
+                                &data_types
+                            )
+                        )
+                    })?;
+
+                let return_type = udaf.return_type(&new_types)?;
+                let nullable = udaf.is_nullable();
+
+                Ok((return_type, nullable))
+            }
+            WindowFunctionDefinition::WindowUDF(udwf) => {
+                let new_types =
+                    data_types_with_window_udf(&data_types, 
udwf).map_err(|err| {
+                        plan_datafusion_err!(
+                            "{} {}",
+                            err,
+                            utils::generate_signature_error_msg(
+                                fun.name(),
+                                fun.signature(),
+                                &data_types
+                            )
+                        )
+                    })?;
+                let (_, function_name) = self.qualified_name();
+                let field_args = WindowUDFFieldArgs::new(&new_types, 
&function_name);
+
+                udwf.field(field_args)
+                    .map(|field| (field.data_type().clone(), 
field.is_nullable()))
+            }
+        }
+    }
+}
+
 /// cast subquery in InSubquery/ScalarSubquery to a given type.
 pub fn cast_subquery(subquery: Subquery, cast_to_type: &DataType) -> 
Result<Subquery> {
     if subquery.subquery.schema().field(0).data_type() == cast_to_type {
diff --git a/datafusion/expr/src/function.rs b/datafusion/expr/src/function.rs
index cd7a0c8aa9..9814d16ddf 100644
--- a/datafusion/expr/src/function.rs
+++ b/datafusion/expr/src/function.rs
@@ -27,6 +27,8 @@ pub use datafusion_functions_aggregate_common::accumulator::{
     AccumulatorArgs, AccumulatorFactoryFunction, StateFieldsArgs,
 };
 
+pub use datafusion_functions_window_common::field::WindowUDFFieldArgs;
+
 #[derive(Debug, Clone, Copy)]
 pub enum Hint {
     /// Indicates the argument needs to be padded if it is scalar
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index fc4432ffdf..7cc57523a1 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -18,7 +18,6 @@
 //! [`WindowUDF`]: User Defined Window Functions
 
 use arrow::compute::SortOptions;
-use arrow::datatypes::DataType;
 use std::cmp::Ordering;
 use std::hash::{DefaultHasher, Hash, Hasher};
 use std::{
@@ -27,7 +26,10 @@ use std::{
     sync::Arc,
 };
 
+use arrow::datatypes::{DataType, Field};
+
 use datafusion_common::{not_impl_err, Result};
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 
 use crate::expr::WindowFunction;
 use crate::{
@@ -139,13 +141,6 @@ impl WindowUDF {
         self.inner.signature()
     }
 
-    /// Return the type of the function given its input types
-    ///
-    /// See [`WindowUDFImpl::return_type`] for more details.
-    pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
-        self.inner.return_type(args)
-    }
-
     /// Do the function rewrite
     ///
     /// See [`WindowUDFImpl::simplify`] for more details.
@@ -158,11 +153,11 @@ impl WindowUDF {
         self.inner.partition_evaluator()
     }
 
-    /// Returns if column values are nullable for this window function.
+    /// Returns the field of the final result of evaluating this window 
function.
     ///
-    /// See [`WindowUDFImpl::nullable`] for more details.
-    pub fn nullable(&self) -> bool {
-        self.inner.nullable()
+    /// See [`WindowUDFImpl::field`] for more details.
+    pub fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        self.inner.field(field_args)
     }
 
     /// Returns custom result ordering introduced by this window function
@@ -201,10 +196,11 @@ where
 /// # Basic Example
 /// ```
 /// # use std::any::Any;
-/// # use arrow::datatypes::DataType;
+/// # use arrow::datatypes::{DataType, Field};
 /// # use datafusion_common::{DataFusionError, plan_err, Result};
 /// # use datafusion_expr::{col, Signature, Volatility, PartitionEvaluator, 
WindowFrame, ExprFunctionExt};
 /// # use datafusion_expr::{WindowUDFImpl, WindowUDF};
+/// use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 /// #[derive(Debug, Clone)]
 /// struct SmoothIt {
 ///   signature: Signature
@@ -223,14 +219,15 @@ where
 ///    fn as_any(&self) -> &dyn Any { self }
 ///    fn name(&self) -> &str { "smooth_it" }
 ///    fn signature(&self) -> &Signature { &self.signature }
-///    fn return_type(&self, args: &[DataType]) -> Result<DataType> {
-///      if !matches!(args.get(0), Some(&DataType::Int32)) {
-///        return plan_err!("smooth_it only accepts Int32 arguments");
-///      }
-///      Ok(DataType::Int32)
-///    }
 ///    // The actual implementation would add one to the argument
 ///    fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> { 
unimplemented!() }
+///    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+///      if let Some(DataType::Int32) = field_args.get_input_type(0) {
+///        Ok(Field::new(field_args.name(), DataType::Int32, false))
+///      } else {
+///        plan_err!("smooth_it only accepts Int32 arguments")
+///      }
+///    }
 /// }
 ///
 /// // Create a new WindowUDF from the implementation
@@ -259,10 +256,6 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
     /// types are accepted and the function's Volatility.
     fn signature(&self) -> &Signature;
 
-    /// What [`DataType`] will be returned by this function, given the types of
-    /// the arguments
-    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType>;
-
     /// Invoke the function, returning the [`PartitionEvaluator`] instance
     fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>>;
 
@@ -324,14 +317,8 @@ pub trait WindowUDFImpl: Debug + Send + Sync {
         hasher.finish()
     }
 
-    /// Allows customizing nullable of column for this window UDF.
-    ///
-    /// By default, the final result of evaluating the window UDF is
-    /// allowed to have null values. But if that is not the case then
-    /// it can be customized in the window UDF implementation.
-    fn nullable(&self) -> bool {
-        true
-    }
+    /// The [`Field`] of the final result of evaluating this window function.
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field>;
 
     /// Allows the window UDF to define a custom result ordering.
     ///
@@ -414,10 +401,6 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
         self.inner.signature()
     }
 
-    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-        self.inner.return_type(arg_types)
-    }
-
     fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         self.inner.partition_evaluator()
     }
@@ -445,8 +428,8 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
         hasher.finish()
     }
 
-    fn nullable(&self) -> bool {
-        self.inner.nullable()
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        self.inner.field(field_args)
     }
 
     fn sort_options(&self) -> Option<SortOptions> {
@@ -461,9 +444,10 @@ impl WindowUDFImpl for AliasedWindowUDFImpl {
 #[cfg(test)]
 mod test {
     use crate::{PartitionEvaluator, WindowUDF, WindowUDFImpl};
-    use arrow::datatypes::DataType;
+    use arrow::datatypes::{DataType, Field};
     use datafusion_common::Result;
     use datafusion_expr_common::signature::{Signature, Volatility};
+    use datafusion_functions_window_common::field::WindowUDFFieldArgs;
     use std::any::Any;
     use std::cmp::Ordering;
 
@@ -495,10 +479,10 @@ mod test {
         fn signature(&self) -> &Signature {
             &self.signature
         }
-        fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+        fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
             unimplemented!()
         }
-        fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+        fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<Field> {
             unimplemented!()
         }
     }
@@ -531,10 +515,10 @@ mod test {
         fn signature(&self) -> &Signature {
             &self.signature
         }
-        fn return_type(&self, _args: &[DataType]) -> Result<DataType> {
+        fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
             unimplemented!()
         }
-        fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
+        fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<Field> {
             unimplemented!()
         }
     }
diff --git a/datafusion/functions-window/Cargo.toml 
b/datafusion/functions-window-common/Cargo.toml
similarity index 80%
copy from datafusion/functions-window/Cargo.toml
copy to datafusion/functions-window-common/Cargo.toml
index 94dd421284..98b6f8c6db 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window-common/Cargo.toml
@@ -16,32 +16,26 @@
 # under the License.
 
 [package]
-name = "datafusion-functions-window"
-description = "Window function packages for the DataFusion query engine"
+name = "datafusion-functions-window-common"
+description = "Common functions for implementing user-defined window functions 
for the DataFusion query engine"
 keywords = ["datafusion", "logical", "plan", "expressions"]
 readme = "README.md"
-version = { workspace = true }
+authors = { workspace = true }
 edition = { workspace = true }
 homepage = { workspace = true }
-repository = { workspace = true }
 license = { workspace = true }
-authors = { workspace = true }
+repository = { workspace = true }
 rust-version = { workspace = true }
+version = { workspace = true }
 
 [lints]
 workspace = true
 
 [lib]
-name = "datafusion_functions_window"
+name = "datafusion_functions_window_common"
 path = "src/lib.rs"
 
 # See more keys and their definitions at 
https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
 datafusion-common = { workspace = true }
-datafusion-expr = { workspace = true }
-datafusion-physical-expr-common = { workspace = true }
-log = { workspace = true }
-
-[dev-dependencies]
-arrow = { workspace = true }
diff --git a/datafusion/functions-window-common/README.md 
b/datafusion/functions-window-common/README.md
new file mode 100644
index 0000000000..de12d25f97
--- /dev/null
+++ b/datafusion/functions-window-common/README.md
@@ -0,0 +1,26 @@
+<!---
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+-->
+
+# DataFusion Window Function Common Library
+
+[DataFusion][df] is an extensible query execution framework, written in Rust, 
that uses Apache Arrow as its in-memory format.
+
+This crate contains common functions for implementing user-defined window 
functions.
+
+[df]: https://crates.io/crates/datafusion
diff --git a/datafusion/functions-window-common/src/field.rs 
b/datafusion/functions-window-common/src/field.rs
new file mode 100644
index 0000000000..8011b7b0f0
--- /dev/null
+++ b/datafusion/functions-window-common/src/field.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::arrow::datatypes::DataType;
+
+/// Metadata for defining the result field from evaluating a
+/// user-defined window function.
+pub struct WindowUDFFieldArgs<'a> {
+    /// The data types corresponding to the arguments to the
+    /// user-defined window function.
+    input_types: &'a [DataType],
+    /// The display name of the user-defined window function.
+    display_name: &'a str,
+}
+
+impl<'a> WindowUDFFieldArgs<'a> {
+    /// Create an instance of [`WindowUDFFieldArgs`].
+    ///
+    /// # Arguments
+    ///
+    /// * `input_types` - The data types corresponding to the
+    ///     arguments to the user-defined window function.
+    /// * `function_name` - The qualified schema name of the
+    ///     user-defined window function expression.
+    ///
+    pub fn new(input_types: &'a [DataType], display_name: &'a str) -> Self {
+        WindowUDFFieldArgs {
+            input_types,
+            display_name,
+        }
+    }
+
+    /// Returns the data type of input expressions passed as arguments
+    /// to the user-defined window function.
+    pub fn input_types(&self) -> &[DataType] {
+        self.input_types
+    }
+
+    /// Returns the name for the field of the final result of evaluating
+    /// the user-defined window function.
+    pub fn name(&self) -> &str {
+        self.display_name
+    }
+
+    /// Returns `Some(DataType)` of input expression at index, otherwise
+    /// returns `None` if the index is out of bounds.
+    pub fn get_input_type(&self, index: usize) -> Option<DataType> {
+        self.input_types.get(index).cloned()
+    }
+}
diff --git a/datafusion/functions-window-common/src/lib.rs 
b/datafusion/functions-window-common/src/lib.rs
new file mode 100644
index 0000000000..2e4bcbbc83
--- /dev/null
+++ b/datafusion/functions-window-common/src/lib.rs
@@ -0,0 +1,21 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Common user-defined window functionality for [DataFusion]
+//!
+//! [DataFusion]: <https://crates.io/crates/datafusion>
+pub mod field;
diff --git a/datafusion/functions-window/Cargo.toml 
b/datafusion/functions-window/Cargo.toml
index 94dd421284..8dcec6bc96 100644
--- a/datafusion/functions-window/Cargo.toml
+++ b/datafusion/functions-window/Cargo.toml
@@ -40,6 +40,7 @@ path = "src/lib.rs"
 [dependencies]
 datafusion-common = { workspace = true }
 datafusion-expr = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
 datafusion-physical-expr-common = { workspace = true }
 log = { workspace = true }
 
diff --git a/datafusion/functions-window/src/row_number.rs 
b/datafusion/functions-window/src/row_number.rs
index 43d2796ad7..7f348bf9d2 100644
--- a/datafusion/functions-window/src/row_number.rs
+++ b/datafusion/functions-window/src/row_number.rs
@@ -25,9 +25,12 @@ use datafusion_common::arrow::array::ArrayRef;
 use datafusion_common::arrow::array::UInt64Array;
 use datafusion_common::arrow::compute::SortOptions;
 use datafusion_common::arrow::datatypes::DataType;
+use datafusion_common::arrow::datatypes::Field;
 use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::expr::WindowFunction;
 use datafusion_expr::{Expr, PartitionEvaluator, Signature, Volatility, 
WindowUDFImpl};
+use datafusion_functions_window_common::field;
+use field::WindowUDFFieldArgs;
 
 /// Create a [`WindowFunction`](Expr::WindowFunction) expression for
 /// `row_number` user-defined window function.
@@ -84,16 +87,12 @@ impl WindowUDFImpl for RowNumber {
         &self.signature
     }
 
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        Ok(DataType::UInt64)
-    }
-
     fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
         Ok(Box::<NumRowsEvaluator>::default())
     }
 
-    fn nullable(&self) -> bool {
-        false
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+        Ok(Field::new(field_args.name(), DataType::UInt64, false))
     }
 
     fn sort_options(&self) -> Option<SortOptions> {
diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml
index 1a9e9630c0..337a24ffae 100644
--- a/datafusion/optimizer/Cargo.toml
+++ b/datafusion/optimizer/Cargo.toml
@@ -57,5 +57,6 @@ regex-syntax = "0.8.0"
 arrow-buffer = { workspace = true }
 ctor = { workspace = true }
 datafusion-functions-aggregate = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
 datafusion-sql = { workspace = true }
 env_logger = { workspace = true }
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs 
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index fc3921d296..a78a54a571 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -1798,6 +1798,7 @@ mod tests {
         interval_arithmetic::Interval,
         *,
     };
+    use datafusion_functions_window_common::field::WindowUDFFieldArgs;
     use std::{
         collections::HashMap,
         ops::{BitAnd, BitOr, BitXor},
@@ -3901,10 +3902,6 @@ mod tests {
             unimplemented!()
         }
 
-        fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-            unimplemented!("not needed for tests")
-        }
-
         fn simplify(&self) -> Option<WindowFunctionSimplification> {
             if self.simplify {
                 Some(Box::new(|_, _| Ok(col("result_column"))))
@@ -3916,5 +3913,9 @@ mod tests {
         fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
             unimplemented!("not needed for tests")
         }
+
+        fn field(&self, _field_args: WindowUDFFieldArgs) -> Result<Field> {
+            unimplemented!("not needed for tests")
+        }
     }
 }
diff --git a/datafusion/physical-plan/Cargo.toml 
b/datafusion/physical-plan/Cargo.toml
index 24387c5f15..c3f1b7eb0e 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -53,6 +53,7 @@ datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
 datafusion-functions-aggregate = { workspace = true }
 datafusion-functions-aggregate-common = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
 datafusion-physical-expr = { workspace = true, default-features = true }
 datafusion-physical-expr-common = { workspace = true }
 futures = { workspace = true }
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index 0275cd2441..981a8e2851 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -51,6 +51,7 @@ mod utils;
 mod window_agg_exec;
 
 pub use bounded_window_agg_exec::BoundedWindowAggExec;
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use datafusion_physical_expr::expressions::Column;
 pub use datafusion_physical_expr::window::{
     BuiltInWindowExpr, PlainAggregateWindowExpr, WindowExpr,
@@ -73,7 +74,8 @@ pub fn schema_add_window_field(
         .iter()
         .map(|e| Arc::clone(e).as_ref().nullable(schema))
         .collect::<Result<Vec<_>>>()?;
-    let window_expr_return_type = window_fn.return_type(&data_types, 
&nullability)?;
+    let window_expr_return_type =
+        window_fn.return_type(&data_types, &nullability, fn_name)?;
     let mut window_fields = schema
         .fields()
         .iter()
@@ -334,13 +336,11 @@ fn create_udwf_window_expr(
         .map(|arg| arg.data_type(input_schema))
         .collect::<Result<_>>()?;
 
-    // figure out the output type
-    let data_type = fun.return_type(&input_types)?;
     Ok(Arc::new(WindowUDFExpr {
         fun: Arc::clone(fun),
         args: args.to_vec(),
+        input_types,
         name,
-        data_type,
     }))
 }
 
@@ -351,8 +351,8 @@ struct WindowUDFExpr {
     args: Vec<Arc<dyn PhysicalExpr>>,
     /// Display name
     name: String,
-    /// result type
-    data_type: DataType,
+    /// Types of input expressions
+    input_types: Vec<DataType>,
 }
 
 impl BuiltInWindowFunctionExpr for WindowUDFExpr {
@@ -361,11 +361,8 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
     }
 
     fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            self.data_type.clone(),
-            self.fun.nullable(),
-        ))
+        self.fun
+            .field(WindowUDFFieldArgs::new(&self.input_types, &self.name))
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
diff --git a/datafusion/proto/Cargo.toml b/datafusion/proto/Cargo.toml
index ce40129fcf..d65c6ccaa6 100644
--- a/datafusion/proto/Cargo.toml
+++ b/datafusion/proto/Cargo.toml
@@ -60,6 +60,7 @@ serde_json = { workspace = true, optional = true }
 [dev-dependencies]
 datafusion-functions = { workspace = true, default-features = true }
 datafusion-functions-aggregate = { workspace = true }
+datafusion-functions-window-common = { workspace = true }
 doc-comment = { workspace = true }
 strum = { version = "0.26.1", features = ["derive"] }
 tokio = { workspace = true, features = ["rt-multi-thread"] }
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index 133c38ab8c..1f1426164d 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -75,6 +75,7 @@ use datafusion_functions_aggregate::expr_fn::{
 };
 use datafusion_functions_aggregate::kurtosis_pop::kurtosis_pop;
 use datafusion_functions_aggregate::string_agg::string_agg;
+use datafusion_functions_window_common::field::WindowUDFFieldArgs;
 use datafusion_proto::bytes::{
     logical_plan_from_bytes, logical_plan_from_bytes_with_extension_codec,
     logical_plan_to_bytes, logical_plan_to_bytes_with_extension_codec,
@@ -2430,20 +2431,21 @@ fn roundtrip_window() {
             &self.signature
         }
 
-        fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
-            if arg_types.len() != 1 {
-                return plan_err!(
-                    "dummy_udwf expects 1 argument, got {}: {:?}",
-                    arg_types.len(),
-                    arg_types
-                );
-            }
-            Ok(arg_types[0].clone())
-        }
-
         fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
             make_partition_evaluator()
         }
+
+        fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
+            if let Some(return_type) = field_args.get_input_type(0) {
+                Ok(Field::new(field_args.name(), return_type, true))
+            } else {
+                plan_err!(
+                    "dummy_udwf expects 1 argument, got {}: {:?}",
+                    field_args.input_types().len(),
+                    field_args.input_types()
+                )
+            }
+        }
     }
 
     fn make_partition_evaluator() -> Result<Box<dyn PartitionEvaluator>> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to