This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f11125744 Make fields of `ScalarUDF` , `AggregateUDF` and `WindowUDF` 
non `pub` (#8079)
7f11125744 is described below

commit 7f111257443d79259eacbe3cb2ace1bdd276e5fc
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Nov 15 04:59:23 2023 -0500

    Make fields of `ScalarUDF` , `AggregateUDF` and `WindowUDF` non `pub` 
(#8079)
    
    * Make fields of ScalarUDF non pub
    
    * Make fields of `WindowUDF` and `AggregateUDF` non pub.
    
    * fix doc
---
 datafusion/core/src/datasource/listing/helpers.rs  |  2 +-
 datafusion/core/src/execution/context/mod.rs       |  6 +--
 datafusion/core/src/physical_planner.rs            |  4 +-
 datafusion/expr/src/expr.rs                        | 14 +++---
 datafusion/expr/src/expr_schema.rs                 |  4 +-
 datafusion/expr/src/udaf.rs                        | 47 +++++++++++++++++---
 datafusion/expr/src/udf.rs                         | 50 ++++++++++++++++++----
 datafusion/expr/src/udwf.rs                        | 44 ++++++++++++++-----
 datafusion/expr/src/window_function.rs             | 12 ++----
 datafusion/optimizer/src/analyzer/type_coercion.rs |  4 +-
 .../src/simplify_expressions/expr_simplifier.rs    |  2 +-
 datafusion/physical-expr/src/functions.rs          |  2 +-
 datafusion/physical-expr/src/scalar_function.rs    |  6 +--
 datafusion/physical-expr/src/udf.rs                |  6 +--
 datafusion/physical-plan/src/udaf.rs               | 10 +++--
 datafusion/physical-plan/src/windows/mod.rs        | 12 ++----
 datafusion/proto/src/logical_plan/to_proto.rs      |  8 ++--
 datafusion/proto/src/physical_plan/from_proto.rs   |  7 ++-
 datafusion/proto/src/physical_plan/to_proto.rs     |  3 +-
 .../proto/tests/cases/roundtrip_logical_plan.rs    | 12 +++---
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  4 +-
 21 files changed, 172 insertions(+), 87 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index 1d929f4bd4..3d2a3dc928 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -102,7 +102,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr: 
&Expr) -> bool {
                 }
             }
             Expr::ScalarUDF(ScalarUDF { fun, .. }) => {
-                match fun.signature.volatility {
+                match fun.signature().volatility {
                     Volatility::Immutable => VisitRecursion::Continue,
                     // TODO: Stable functions could be `applicable`, but that 
would require access to the context
                     Volatility::Stable | Volatility::Volatile => {
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 9c500ec072..5c79c407b7 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -806,7 +806,7 @@ impl SessionContext {
         self.state
             .write()
             .scalar_functions
-            .insert(f.name.clone(), Arc::new(f));
+            .insert(f.name().to_string(), Arc::new(f));
     }
 
     /// Registers an aggregate UDF within this context.
@@ -820,7 +820,7 @@ impl SessionContext {
         self.state
             .write()
             .aggregate_functions
-            .insert(f.name.clone(), Arc::new(f));
+            .insert(f.name().to_string(), Arc::new(f));
     }
 
     /// Registers a window UDF within this context.
@@ -834,7 +834,7 @@ impl SessionContext {
         self.state
             .write()
             .window_functions
-            .insert(f.name.clone(), Arc::new(f));
+            .insert(f.name().to_string(), Arc::new(f));
     }
 
     /// Creates a [`DataFrame`] for reading a data source.
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 9c1d978acc..fffc51abeb 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -222,7 +222,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> 
Result<String> {
             create_function_physical_name(&func.fun.to_string(), false, 
&func.args)
         }
         Expr::ScalarUDF(ScalarUDF { fun, args }) => {
-            create_function_physical_name(&fun.name, false, args)
+            create_function_physical_name(fun.name(), false, args)
         }
         Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
             create_function_physical_name(&fun.to_string(), false, args)
@@ -250,7 +250,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> 
Result<String> {
             for e in args {
                 names.push(create_physical_name(e, false)?);
             }
-            Ok(format!("{}({})", fun.name, names.join(",")))
+            Ok(format!("{}({})", fun.name(), names.join(",")))
         }
         Expr::GroupingSet(grouping_set) => match grouping_set {
             GroupingSet::Rollup(exprs) => Ok(format!(
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 97e4fcc327..2b2d30af3b 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -338,7 +338,7 @@ impl Between {
     }
 }
 
-/// ScalarFunction expression
+/// ScalarFunction expression invokes a built-in scalar function
 #[derive(Clone, PartialEq, Eq, Hash, Debug)]
 pub struct ScalarFunction {
     /// The function
@@ -354,7 +354,9 @@ impl ScalarFunction {
     }
 }
 
-/// ScalarUDF expression
+/// ScalarUDF expression invokes a user-defined scalar function [`ScalarUDF`]
+///
+/// [`ScalarUDF`]: crate::ScalarUDF
 #[derive(Clone, PartialEq, Eq, Hash, Debug)]
 pub struct ScalarUDF {
     /// The function
@@ -1200,7 +1202,7 @@ impl fmt::Display for Expr {
                 fmt_function(f, &func.fun.to_string(), false, &func.args, true)
             }
             Expr::ScalarUDF(ScalarUDF { fun, args }) => {
-                fmt_function(f, &fun.name, false, args, true)
+                fmt_function(f, fun.name(), false, args, true)
             }
             Expr::WindowFunction(WindowFunction {
                 fun,
@@ -1247,7 +1249,7 @@ impl fmt::Display for Expr {
                 order_by,
                 ..
             }) => {
-                fmt_function(f, &fun.name, false, args, true)?;
+                fmt_function(f, fun.name(), false, args, true)?;
                 if let Some(fe) = filter {
                     write!(f, " FILTER (WHERE {fe})")?;
                 }
@@ -1536,7 +1538,7 @@ fn create_name(e: &Expr) -> Result<String> {
             create_function_name(&func.fun.to_string(), false, &func.args)
         }
         Expr::ScalarUDF(ScalarUDF { fun, args }) => {
-            create_function_name(&fun.name, false, args)
+            create_function_name(fun.name(), false, args)
         }
         Expr::WindowFunction(WindowFunction {
             fun,
@@ -1589,7 +1591,7 @@ fn create_name(e: &Expr) -> Result<String> {
             if let Some(ob) = order_by {
                 info += &format!(" ORDER BY ([{}])", expr_vec_fmt!(ob));
             }
-            Ok(format!("{}({}){}", fun.name, names.join(","), info))
+            Ok(format!("{}({}){}", fun.name(), names.join(","), info))
         }
         Expr::GroupingSet(grouping_set) => match grouping_set {
             GroupingSet::Rollup(exprs) => {
diff --git a/datafusion/expr/src/expr_schema.rs 
b/datafusion/expr/src/expr_schema.rs
index 5881feece1..0d06a12951 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -87,7 +87,7 @@ impl ExprSchemable for Expr {
                     .iter()
                     .map(|e| e.get_type(schema))
                     .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
+                Ok(fun.return_type(&data_types)?)
             }
             Expr::ScalarFunction(ScalarFunction { fun, args }) => {
                 let arg_data_types = args
@@ -128,7 +128,7 @@ impl ExprSchemable for Expr {
                     .iter()
                     .map(|e| e.get_type(schema))
                     .collect::<Result<Vec<_>>>()?;
-                Ok((fun.return_type)(&data_types)?.as_ref().clone())
+                fun.return_type(&data_types)
             }
             Expr::Not(_)
             | Expr::IsNull(_)
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index 84e238a121..b06e97acc2 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Udaf module contains functions and structs supporting user-defined 
aggregate functions.
+//! [`AggregateUDF`]: User Defined Aggregate Functions
 
-use crate::Expr;
+use crate::{Accumulator, Expr};
 use crate::{
     AccumulatorFactoryFunction, ReturnTypeFunction, Signature, 
StateTypeFunction,
 };
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
 use std::fmt::{self, Debug, Formatter};
 use std::sync::Arc;
 
@@ -46,15 +48,15 @@ use std::sync::Arc;
 #[derive(Clone)]
 pub struct AggregateUDF {
     /// name
-    pub name: String,
+    name: String,
     /// Signature (input arguments)
-    pub signature: Signature,
+    signature: Signature,
     /// Return type
-    pub return_type: ReturnTypeFunction,
+    return_type: ReturnTypeFunction,
     /// actual implementation
-    pub accumulator: AccumulatorFactoryFunction,
+    accumulator: AccumulatorFactoryFunction,
     /// the accumulator's state's description as a function of the return type
-    pub state_type: StateTypeFunction,
+    state_type: StateTypeFunction,
 }
 
 impl Debug for AggregateUDF {
@@ -112,4 +114,35 @@ impl AggregateUDF {
             order_by: None,
         })
     }
+
+    /// Returns this function's name
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Returns this function's signature (what input types are accepted)
+    pub fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    /// Return the type of the function given its input types
+    pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+        // Old API returns an Arc of the datatype for some reason
+        let res = (self.return_type)(args)?;
+        Ok(res.as_ref().clone())
+    }
+
+    /// Return an accumualator the given aggregate, given
+    /// its return datatype.
+    pub fn accumulator(&self, return_type: &DataType) -> Result<Box<dyn 
Accumulator>> {
+        (self.accumulator)(return_type)
+    }
+
+    /// Return the type of the intermediate state used by this aggregator, 
given
+    /// its return datatype. Supports multi-phase aggregations
+    pub fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>> {
+        // old API returns an Arc for some reason, try and unwrap it here
+        let res = (self.state_type)(return_type)?;
+        Ok(Arc::try_unwrap(res).unwrap_or_else(|res| res.as_ref().clone()))
+    }
 }
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index be6c90aa59..22e56caaaf 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -15,23 +15,31 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Udf module contains foundational types that are used to represent UDFs in 
DataFusion.
+//! [`ScalarUDF`]: Scalar User Defined Functions
 
 use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
 use std::fmt;
 use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-/// Logical representation of a UDF.
+/// Logical representation of a Scalar User Defined Function.
+///
+/// A scalar function produces a single row output for each row of input.
+///
+/// This struct contains the information DataFusion needs to plan and invoke
+/// functions such name, type signature, return type, and actual 
implementation.
+///
 #[derive(Clone)]
 pub struct ScalarUDF {
-    /// name
-    pub name: String,
-    /// signature
-    pub signature: Signature,
-    /// Return type
-    pub return_type: ReturnTypeFunction,
+    /// The name of the function
+    name: String,
+    /// The signature (the types of arguments that are supported)
+    signature: Signature,
+    /// Function that returns the return type given the argument types
+    return_type: ReturnTypeFunction,
     /// actual implementation
     ///
     /// The fn param is the wrapped function but be aware that the function 
will
@@ -40,7 +48,7 @@ pub struct ScalarUDF {
     /// will be passed. In that case the single element is a null array to 
indicate
     /// the batch's row count (so that the generative zero-argument function 
can know
     /// the result array size).
-    pub fun: ScalarFunctionImplementation,
+    fun: ScalarFunctionImplementation,
 }
 
 impl Debug for ScalarUDF {
@@ -89,4 +97,28 @@ impl ScalarUDF {
     pub fn call(&self, args: Vec<Expr>) -> Expr {
         Expr::ScalarUDF(crate::expr::ScalarUDF::new(Arc::new(self.clone()), 
args))
     }
+
+    /// Returns this function's name
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Returns this function's signature (what input types are accepted)
+    pub fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    /// Return the type of the function given its input types
+    pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+        // Old API returns an Arc of the datatype for some reason
+        let res = (self.return_type)(args)?;
+        Ok(res.as_ref().clone())
+    }
+
+    /// Return the actual implementation
+    pub fn fun(&self) -> ScalarFunctionImplementation {
+        self.fun.clone()
+    }
+
+    // TODO maybe add an invoke() method that runs the actual function?
 }
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index c0a2a8205a..c233ee84b3 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -15,17 +15,19 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Support for user-defined window (UDWF) window functions
+//! [`WindowUDF`]: User Defined Window Functions
 
+use crate::{
+    Expr, PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction, 
Signature,
+    WindowFrame,
+};
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
 use std::{
     fmt::{self, Debug, Display, Formatter},
     sync::Arc,
 };
 
-use crate::{
-    Expr, PartitionEvaluatorFactory, ReturnTypeFunction, Signature, 
WindowFrame,
-};
-
 /// Logical representation of a user-defined window function (UDWF)
 /// A UDWF is different from a UDF in that it is stateful across batches.
 ///
@@ -35,13 +37,13 @@ use crate::{
 #[derive(Clone)]
 pub struct WindowUDF {
     /// name
-    pub name: String,
+    name: String,
     /// signature
-    pub signature: Signature,
+    signature: Signature,
     /// Return type
-    pub return_type: ReturnTypeFunction,
+    return_type: ReturnTypeFunction,
     /// Return the partition evaluator
-    pub partition_evaluator_factory: PartitionEvaluatorFactory,
+    partition_evaluator_factory: PartitionEvaluatorFactory,
 }
 
 impl Debug for WindowUDF {
@@ -86,7 +88,7 @@ impl WindowUDF {
         partition_evaluator_factory: &PartitionEvaluatorFactory,
     ) -> Self {
         Self {
-            name: name.to_owned(),
+            name: name.to_string(),
             signature: signature.clone(),
             return_type: return_type.clone(),
             partition_evaluator_factory: partition_evaluator_factory.clone(),
@@ -115,4 +117,26 @@ impl WindowUDF {
             window_frame,
         })
     }
+
+    /// Returns this function's name
+    pub fn name(&self) -> &str {
+        &self.name
+    }
+
+    /// Returns this function's signature (what input types are accepted)
+    pub fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    /// Return the type of the function given its input types
+    pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+        // Old API returns an Arc of the datatype for some reason
+        let res = (self.return_type)(args)?;
+        Ok(res.as_ref().clone())
+    }
+
+    /// Return a `PartitionEvaluator` for evaluating this window function
+    pub fn partition_evaluator_factory(&self) -> Result<Box<dyn 
PartitionEvaluator>> {
+        (self.partition_evaluator_factory)()
+    }
 }
diff --git a/datafusion/expr/src/window_function.rs 
b/datafusion/expr/src/window_function.rs
index 463cceafeb..35b7bded70 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -171,12 +171,8 @@ impl WindowFunction {
             WindowFunction::BuiltInWindowFunction(fun) => {
                 fun.return_type(input_expr_types)
             }
-            WindowFunction::AggregateUDF(fun) => {
-                Ok((*(fun.return_type)(input_expr_types)?).clone())
-            }
-            WindowFunction::WindowUDF(fun) => {
-                Ok((*(fun.return_type)(input_expr_types)?).clone())
-            }
+            WindowFunction::AggregateUDF(fun) => 
fun.return_type(input_expr_types),
+            WindowFunction::WindowUDF(fun) => 
fun.return_type(input_expr_types),
         }
     }
 }
@@ -234,8 +230,8 @@ impl WindowFunction {
         match self {
             WindowFunction::AggregateFunction(fun) => fun.signature(),
             WindowFunction::BuiltInWindowFunction(fun) => fun.signature(),
-            WindowFunction::AggregateUDF(fun) => fun.signature.clone(),
-            WindowFunction::WindowUDF(fun) => fun.signature.clone(),
+            WindowFunction::AggregateUDF(fun) => fun.signature().clone(),
+            WindowFunction::WindowUDF(fun) => fun.signature().clone(),
         }
     }
 }
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index bfdbec3901..57dabbfee4 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -323,7 +323,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
                 let new_expr = coerce_arguments_for_signature(
                     args.as_slice(),
                     &self.schema,
-                    &fun.signature,
+                    fun.signature(),
                 )?;
                 Ok(Expr::ScalarUDF(ScalarUDF::new(fun, new_expr)))
             }
@@ -364,7 +364,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
                 let new_expr = coerce_arguments_for_signature(
                     args.as_slice(),
                     &self.schema,
-                    &fun.signature,
+                    fun.signature(),
                 )?;
                 let expr = Expr::AggregateUDF(expr::AggregateUDF::new(
                     fun, new_expr, filter, order_by,
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs 
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index c5a1aacce7..947a6f6070 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -349,7 +349,7 @@ impl<'a> ConstEvaluator<'a> {
                 Self::volatility_ok(fun.volatility())
             }
             Expr::ScalarUDF(expr::ScalarUDF { fun, .. }) => {
-                Self::volatility_ok(fun.signature.volatility)
+                Self::volatility_ok(fun.signature().volatility)
             }
             Expr::Literal(_)
             | Expr::BinaryExpr { .. }
diff --git a/datafusion/physical-expr/src/functions.rs 
b/datafusion/physical-expr/src/functions.rs
index 799127c95c..543d7eb654 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -78,7 +78,7 @@ pub fn create_physical_expr(
         &format!("{fun}"),
         fun_expr,
         input_phy_exprs.to_vec(),
-        &data_type,
+        data_type,
         monotonicity,
     )))
 }
diff --git a/datafusion/physical-expr/src/scalar_function.rs 
b/datafusion/physical-expr/src/scalar_function.rs
index 63101c03bc..0a9d69720e 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -77,14 +77,14 @@ impl ScalarFunctionExpr {
         name: &str,
         fun: ScalarFunctionImplementation,
         args: Vec<Arc<dyn PhysicalExpr>>,
-        return_type: &DataType,
+        return_type: DataType,
         monotonicity: Option<FuncMonotonicity>,
     ) -> Self {
         Self {
             fun,
             name: name.to_owned(),
             args,
-            return_type: return_type.clone(),
+            return_type,
             monotonicity,
         }
     }
@@ -173,7 +173,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
             &self.name,
             self.fun.clone(),
             children,
-            self.return_type(),
+            self.return_type().clone(),
             self.monotonicity.clone(),
         )))
     }
diff --git a/datafusion/physical-expr/src/udf.rs 
b/datafusion/physical-expr/src/udf.rs
index af1e77cbf5..0ec1cf3f25 100644
--- a/datafusion/physical-expr/src/udf.rs
+++ b/datafusion/physical-expr/src/udf.rs
@@ -35,10 +35,10 @@ pub fn create_physical_expr(
         .collect::<Result<Vec<_>>>()?;
 
     Ok(Arc::new(ScalarFunctionExpr::new(
-        &fun.name,
-        fun.fun.clone(),
+        fun.name(),
+        fun.fun().clone(),
         input_phy_exprs.to_vec(),
-        (fun.return_type)(&input_exprs_types)?.as_ref(),
+        fun.return_type(&input_exprs_types)?,
         None,
     )))
 }
diff --git a/datafusion/physical-plan/src/udaf.rs 
b/datafusion/physical-plan/src/udaf.rs
index 7cc3cc7d59..94017efe97 100644
--- a/datafusion/physical-plan/src/udaf.rs
+++ b/datafusion/physical-plan/src/udaf.rs
@@ -50,7 +50,7 @@ pub fn create_aggregate_expr(
     Ok(Arc::new(AggregateFunctionExpr {
         fun: fun.clone(),
         args: input_phy_exprs.to_vec(),
-        data_type: (fun.return_type)(&input_exprs_types)?.as_ref().clone(),
+        data_type: fun.return_type(&input_exprs_types)?,
         name: name.into(),
     }))
 }
@@ -83,7 +83,9 @@ impl AggregateExpr for AggregateFunctionExpr {
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
-        let fields = (self.fun.state_type)(&self.data_type)?
+        let fields = self
+            .fun
+            .state_type(&self.data_type)?
             .iter()
             .enumerate()
             .map(|(i, data_type)| {
@@ -103,11 +105,11 @@ impl AggregateExpr for AggregateFunctionExpr {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        (self.fun.accumulator)(&self.data_type)
+        self.fun.accumulator(&self.data_type)
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        let accumulator = (self.fun.accumulator)(&self.data_type)?;
+        let accumulator = self.fun.accumulator(&self.data_type)?;
 
         // Accumulators that have window frame startings different
         // than `UNBOUNDED PRECEDING`, such as `1 PRECEEDING`, need to
diff --git a/datafusion/physical-plan/src/windows/mod.rs 
b/datafusion/physical-plan/src/windows/mod.rs
index b6ed6e482f..541192c00d 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -255,7 +255,7 @@ fn create_udwf_window_expr(
         .collect::<Result<_>>()?;
 
     // figure out the output type
-    let data_type = (fun.return_type)(&input_types)?;
+    let data_type = fun.return_type(&input_types)?;
     Ok(Arc::new(WindowUDFExpr {
         fun: Arc::clone(fun),
         args: args.to_vec(),
@@ -272,7 +272,7 @@ struct WindowUDFExpr {
     /// Display name
     name: String,
     /// result type
-    data_type: Arc<DataType>,
+    data_type: DataType,
 }
 
 impl BuiltInWindowFunctionExpr for WindowUDFExpr {
@@ -282,11 +282,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
 
     fn field(&self) -> Result<Field> {
         let nullable = true;
-        Ok(Field::new(
-            &self.name,
-            self.data_type.as_ref().clone(),
-            nullable,
-        ))
+        Ok(Field::new(&self.name, self.data_type.clone(), nullable))
     }
 
     fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -294,7 +290,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
     }
 
     fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
-        (self.fun.partition_evaluator_factory)()
+        self.fun.partition_evaluator_factory()
     }
 
     fn name(&self) -> &str {
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index 491b7f6664..144f285310 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -613,12 +613,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
                     }
                     WindowFunction::AggregateUDF(aggr_udf) => {
                         protobuf::window_expr_node::WindowFunction::Udaf(
-                            aggr_udf.name.clone(),
+                            aggr_udf.name().to_string(),
                         )
                     }
                     WindowFunction::WindowUDF(window_udf) => {
                         protobuf::window_expr_node::WindowFunction::Udwf(
-                            window_udf.name.clone(),
+                            window_udf.name().to_string(),
                         )
                     }
                 };
@@ -769,7 +769,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
             }
             Expr::ScalarUDF(ScalarUDF { fun, args }) => Self {
                 expr_type: 
Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
-                    fun_name: fun.name.clone(),
+                    fun_name: fun.name().to_string(),
                     args: args
                         .iter()
                         .map(|expr| expr.try_into())
@@ -784,7 +784,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
             }) => Self {
                 expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
                     protobuf::AggregateUdfExprNode {
-                        fun_name: fun.name.clone(),
+                        fun_name: fun.name().to_string(),
                         args: args.iter().map(|expr| 
expr.try_into()).collect::<Result<
                             Vec<_>,
                             Error,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs 
b/datafusion/proto/src/physical_plan/from_proto.rs
index a628523f0e..22b74db9af 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -18,7 +18,6 @@
 //! Serde code to convert from protocol buffers to Rust data structures.
 
 use std::convert::{TryFrom, TryInto};
-use std::ops::Deref;
 use std::sync::Arc;
 
 use arrow::compute::SortOptions;
@@ -314,12 +313,12 @@ pub fn parse_physical_expr(
                 &e.name,
                 fun_expr,
                 args,
-                &convert_required!(e.return_type)?,
+                convert_required!(e.return_type)?,
                 None,
             ))
         }
         ExprType::ScalarUdf(e) => {
-            let scalar_fun = 
registry.udf(e.name.as_str())?.deref().clone().fun;
+            let scalar_fun = registry.udf(e.name.as_str())?.fun().clone();
 
             let args = e
                 .args
@@ -331,7 +330,7 @@ pub fn parse_physical_expr(
                 e.name.as_str(),
                 scalar_fun,
                 args,
-                &convert_required!(e.return_type)?,
+                convert_required!(e.return_type)?,
                 None,
             ))
         }
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs 
b/datafusion/proto/src/physical_plan/to_proto.rs
index 8201ef86b5..b8a590b0dc 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -84,10 +84,11 @@ impl TryFrom<Arc<dyn AggregateExpr>> for 
protobuf::PhysicalExprNode {
             .collect::<Result<Vec<_>>>()?;
 
         if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
+            let name = a.fun().name().to_string();
             return Ok(protobuf::PhysicalExprNode {
                     expr_type: 
Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
                         protobuf::PhysicalAggregateExprNode {
-                            aggregate_function: 
Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
+                            aggregate_function: 
Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
                             expr: expressions,
                             ordering_req,
                             distinct: false,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index cc76e8a19e..75af9d2e0a 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1559,12 +1559,12 @@ fn roundtrip_window() {
         Ok(Box::new(DummyWindow {}))
     }
 
-    let dummy_window_udf = WindowUDF {
-        name: String::from("dummy_udwf"),
-        signature: Signature::exact(vec![DataType::Float64], 
Volatility::Immutable),
-        return_type: Arc::new(return_type),
-        partition_evaluator_factory: Arc::new(make_partition_evaluator),
-    };
+    let dummy_window_udf = WindowUDF::new(
+        "dummy_udwf",
+        &Signature::exact(vec![DataType::Float64], Volatility::Immutable),
+        &(Arc::new(return_type) as _),
+        &(Arc::new(make_partition_evaluator) as _),
+    );
 
     let test_expr6 = Expr::WindowFunction(expr::WindowFunction::new(
         WindowFunction::WindowUDF(Arc::new(dummy_window_udf.clone())),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 81e66d5ead..076ca41581 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -522,7 +522,7 @@ fn roundtrip_builtin_scalar_function() -> Result<()> {
         "acos",
         fun_expr,
         vec![col("a", &schema)?],
-        &DataType::Int64,
+        DataType::Int64,
         None,
     );
 
@@ -556,7 +556,7 @@ fn roundtrip_scalar_udf() -> Result<()> {
         "dummy",
         scalar_fn,
         vec![col("a", &schema)?],
-        &DataType::Int64,
+        DataType::Int64,
         None,
     );
 

Reply via email to