This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7f11125744 Make fields of `ScalarUDF` , `AggregateUDF` and `WindowUDF`
non `pub` (#8079)
7f11125744 is described below
commit 7f111257443d79259eacbe3cb2ace1bdd276e5fc
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Nov 15 04:59:23 2023 -0500
Make fields of `ScalarUDF` , `AggregateUDF` and `WindowUDF` non `pub`
(#8079)
* Make fields of ScalarUDF non pub
* Make fields of `WindowUDF` and `AggregateUDF` non pub.
* fix doc
---
datafusion/core/src/datasource/listing/helpers.rs | 2 +-
datafusion/core/src/execution/context/mod.rs | 6 +--
datafusion/core/src/physical_planner.rs | 4 +-
datafusion/expr/src/expr.rs | 14 +++---
datafusion/expr/src/expr_schema.rs | 4 +-
datafusion/expr/src/udaf.rs | 47 +++++++++++++++++---
datafusion/expr/src/udf.rs | 50 ++++++++++++++++++----
datafusion/expr/src/udwf.rs | 44 ++++++++++++++-----
datafusion/expr/src/window_function.rs | 12 ++----
datafusion/optimizer/src/analyzer/type_coercion.rs | 4 +-
.../src/simplify_expressions/expr_simplifier.rs | 2 +-
datafusion/physical-expr/src/functions.rs | 2 +-
datafusion/physical-expr/src/scalar_function.rs | 6 +--
datafusion/physical-expr/src/udf.rs | 6 +--
datafusion/physical-plan/src/udaf.rs | 10 +++--
datafusion/physical-plan/src/windows/mod.rs | 12 ++----
datafusion/proto/src/logical_plan/to_proto.rs | 8 ++--
datafusion/proto/src/physical_plan/from_proto.rs | 7 ++-
datafusion/proto/src/physical_plan/to_proto.rs | 3 +-
.../proto/tests/cases/roundtrip_logical_plan.rs | 12 +++---
.../proto/tests/cases/roundtrip_physical_plan.rs | 4 +-
21 files changed, 172 insertions(+), 87 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/helpers.rs
b/datafusion/core/src/datasource/listing/helpers.rs
index 1d929f4bd4..3d2a3dc928 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -102,7 +102,7 @@ pub fn expr_applicable_for_cols(col_names: &[String], expr:
&Expr) -> bool {
}
}
Expr::ScalarUDF(ScalarUDF { fun, .. }) => {
- match fun.signature.volatility {
+ match fun.signature().volatility {
Volatility::Immutable => VisitRecursion::Continue,
// TODO: Stable functions could be `applicable`, but that
would require access to the context
Volatility::Stable | Volatility::Volatile => {
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index 9c500ec072..5c79c407b7 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -806,7 +806,7 @@ impl SessionContext {
self.state
.write()
.scalar_functions
- .insert(f.name.clone(), Arc::new(f));
+ .insert(f.name().to_string(), Arc::new(f));
}
/// Registers an aggregate UDF within this context.
@@ -820,7 +820,7 @@ impl SessionContext {
self.state
.write()
.aggregate_functions
- .insert(f.name.clone(), Arc::new(f));
+ .insert(f.name().to_string(), Arc::new(f));
}
/// Registers a window UDF within this context.
@@ -834,7 +834,7 @@ impl SessionContext {
self.state
.write()
.window_functions
- .insert(f.name.clone(), Arc::new(f));
+ .insert(f.name().to_string(), Arc::new(f));
}
/// Creates a [`DataFrame`] for reading a data source.
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 9c1d978acc..fffc51abeb 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -222,7 +222,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
create_function_physical_name(&func.fun.to_string(), false,
&func.args)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
- create_function_physical_name(&fun.name, false, args)
+ create_function_physical_name(fun.name(), false, args)
}
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
create_function_physical_name(&fun.to_string(), false, args)
@@ -250,7 +250,7 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) ->
Result<String> {
for e in args {
names.push(create_physical_name(e, false)?);
}
- Ok(format!("{}({})", fun.name, names.join(",")))
+ Ok(format!("{}({})", fun.name(), names.join(",")))
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 97e4fcc327..2b2d30af3b 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -338,7 +338,7 @@ impl Between {
}
}
-/// ScalarFunction expression
+/// ScalarFunction expression invokes a built-in scalar function
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarFunction {
/// The function
@@ -354,7 +354,9 @@ impl ScalarFunction {
}
}
-/// ScalarUDF expression
+/// ScalarUDF expression invokes a user-defined scalar function [`ScalarUDF`]
+///
+/// [`ScalarUDF`]: crate::ScalarUDF
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct ScalarUDF {
/// The function
@@ -1200,7 +1202,7 @@ impl fmt::Display for Expr {
fmt_function(f, &func.fun.to_string(), false, &func.args, true)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
- fmt_function(f, &fun.name, false, args, true)
+ fmt_function(f, fun.name(), false, args, true)
}
Expr::WindowFunction(WindowFunction {
fun,
@@ -1247,7 +1249,7 @@ impl fmt::Display for Expr {
order_by,
..
}) => {
- fmt_function(f, &fun.name, false, args, true)?;
+ fmt_function(f, fun.name(), false, args, true)?;
if let Some(fe) = filter {
write!(f, " FILTER (WHERE {fe})")?;
}
@@ -1536,7 +1538,7 @@ fn create_name(e: &Expr) -> Result<String> {
create_function_name(&func.fun.to_string(), false, &func.args)
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => {
- create_function_name(&fun.name, false, args)
+ create_function_name(fun.name(), false, args)
}
Expr::WindowFunction(WindowFunction {
fun,
@@ -1589,7 +1591,7 @@ fn create_name(e: &Expr) -> Result<String> {
if let Some(ob) = order_by {
info += &format!(" ORDER BY ([{}])", expr_vec_fmt!(ob));
}
- Ok(format!("{}({}){}", fun.name, names.join(","), info))
+ Ok(format!("{}({}){}", fun.name(), names.join(","), info))
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => {
diff --git a/datafusion/expr/src/expr_schema.rs
b/datafusion/expr/src/expr_schema.rs
index 5881feece1..0d06a12951 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -87,7 +87,7 @@ impl ExprSchemable for Expr {
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
- Ok((fun.return_type)(&data_types)?.as_ref().clone())
+ Ok(fun.return_type(&data_types)?)
}
Expr::ScalarFunction(ScalarFunction { fun, args }) => {
let arg_data_types = args
@@ -128,7 +128,7 @@ impl ExprSchemable for Expr {
.iter()
.map(|e| e.get_type(schema))
.collect::<Result<Vec<_>>>()?;
- Ok((fun.return_type)(&data_types)?.as_ref().clone())
+ fun.return_type(&data_types)
}
Expr::Not(_)
| Expr::IsNull(_)
diff --git a/datafusion/expr/src/udaf.rs b/datafusion/expr/src/udaf.rs
index 84e238a121..b06e97acc2 100644
--- a/datafusion/expr/src/udaf.rs
+++ b/datafusion/expr/src/udaf.rs
@@ -15,12 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-//! Udaf module contains functions and structs supporting user-defined
aggregate functions.
+//! [`AggregateUDF`]: User Defined Aggregate Functions
-use crate::Expr;
+use crate::{Accumulator, Expr};
use crate::{
AccumulatorFactoryFunction, ReturnTypeFunction, Signature,
StateTypeFunction,
};
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
use std::fmt::{self, Debug, Formatter};
use std::sync::Arc;
@@ -46,15 +48,15 @@ use std::sync::Arc;
#[derive(Clone)]
pub struct AggregateUDF {
/// name
- pub name: String,
+ name: String,
/// Signature (input arguments)
- pub signature: Signature,
+ signature: Signature,
/// Return type
- pub return_type: ReturnTypeFunction,
+ return_type: ReturnTypeFunction,
/// actual implementation
- pub accumulator: AccumulatorFactoryFunction,
+ accumulator: AccumulatorFactoryFunction,
/// the accumulator's state's description as a function of the return type
- pub state_type: StateTypeFunction,
+ state_type: StateTypeFunction,
}
impl Debug for AggregateUDF {
@@ -112,4 +114,35 @@ impl AggregateUDF {
order_by: None,
})
}
+
+ /// Returns this function's name
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+
+ /// Returns this function's signature (what input types are accepted)
+ pub fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ /// Return the type of the function given its input types
+ pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+ // Old API returns an Arc of the datatype for some reason
+ let res = (self.return_type)(args)?;
+ Ok(res.as_ref().clone())
+ }
+
+ /// Return an accumualator the given aggregate, given
+ /// its return datatype.
+ pub fn accumulator(&self, return_type: &DataType) -> Result<Box<dyn
Accumulator>> {
+ (self.accumulator)(return_type)
+ }
+
+ /// Return the type of the intermediate state used by this aggregator,
given
+ /// its return datatype. Supports multi-phase aggregations
+ pub fn state_type(&self, return_type: &DataType) -> Result<Vec<DataType>> {
+ // old API returns an Arc for some reason, try and unwrap it here
+ let res = (self.state_type)(return_type)?;
+ Ok(Arc::try_unwrap(res).unwrap_or_else(|res| res.as_ref().clone()))
+ }
}
diff --git a/datafusion/expr/src/udf.rs b/datafusion/expr/src/udf.rs
index be6c90aa59..22e56caaaf 100644
--- a/datafusion/expr/src/udf.rs
+++ b/datafusion/expr/src/udf.rs
@@ -15,23 +15,31 @@
// specific language governing permissions and limitations
// under the License.
-//! Udf module contains foundational types that are used to represent UDFs in
DataFusion.
+//! [`ScalarUDF`]: Scalar User Defined Functions
use crate::{Expr, ReturnTypeFunction, ScalarFunctionImplementation, Signature};
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
use std::fmt;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;
-/// Logical representation of a UDF.
+/// Logical representation of a Scalar User Defined Function.
+///
+/// A scalar function produces a single row output for each row of input.
+///
+/// This struct contains the information DataFusion needs to plan and invoke
+/// functions such name, type signature, return type, and actual
implementation.
+///
#[derive(Clone)]
pub struct ScalarUDF {
- /// name
- pub name: String,
- /// signature
- pub signature: Signature,
- /// Return type
- pub return_type: ReturnTypeFunction,
+ /// The name of the function
+ name: String,
+ /// The signature (the types of arguments that are supported)
+ signature: Signature,
+ /// Function that returns the return type given the argument types
+ return_type: ReturnTypeFunction,
/// actual implementation
///
/// The fn param is the wrapped function but be aware that the function
will
@@ -40,7 +48,7 @@ pub struct ScalarUDF {
/// will be passed. In that case the single element is a null array to
indicate
/// the batch's row count (so that the generative zero-argument function
can know
/// the result array size).
- pub fun: ScalarFunctionImplementation,
+ fun: ScalarFunctionImplementation,
}
impl Debug for ScalarUDF {
@@ -89,4 +97,28 @@ impl ScalarUDF {
pub fn call(&self, args: Vec<Expr>) -> Expr {
Expr::ScalarUDF(crate::expr::ScalarUDF::new(Arc::new(self.clone()),
args))
}
+
+ /// Returns this function's name
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+
+ /// Returns this function's signature (what input types are accepted)
+ pub fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ /// Return the type of the function given its input types
+ pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+ // Old API returns an Arc of the datatype for some reason
+ let res = (self.return_type)(args)?;
+ Ok(res.as_ref().clone())
+ }
+
+ /// Return the actual implementation
+ pub fn fun(&self) -> ScalarFunctionImplementation {
+ self.fun.clone()
+ }
+
+ // TODO maybe add an invoke() method that runs the actual function?
}
diff --git a/datafusion/expr/src/udwf.rs b/datafusion/expr/src/udwf.rs
index c0a2a8205a..c233ee84b3 100644
--- a/datafusion/expr/src/udwf.rs
+++ b/datafusion/expr/src/udwf.rs
@@ -15,17 +15,19 @@
// specific language governing permissions and limitations
// under the License.
-//! Support for user-defined window (UDWF) window functions
+//! [`WindowUDF`]: User Defined Window Functions
+use crate::{
+ Expr, PartitionEvaluator, PartitionEvaluatorFactory, ReturnTypeFunction,
Signature,
+ WindowFrame,
+};
+use arrow::datatypes::DataType;
+use datafusion_common::Result;
use std::{
fmt::{self, Debug, Display, Formatter},
sync::Arc,
};
-use crate::{
- Expr, PartitionEvaluatorFactory, ReturnTypeFunction, Signature,
WindowFrame,
-};
-
/// Logical representation of a user-defined window function (UDWF)
/// A UDWF is different from a UDF in that it is stateful across batches.
///
@@ -35,13 +37,13 @@ use crate::{
#[derive(Clone)]
pub struct WindowUDF {
/// name
- pub name: String,
+ name: String,
/// signature
- pub signature: Signature,
+ signature: Signature,
/// Return type
- pub return_type: ReturnTypeFunction,
+ return_type: ReturnTypeFunction,
/// Return the partition evaluator
- pub partition_evaluator_factory: PartitionEvaluatorFactory,
+ partition_evaluator_factory: PartitionEvaluatorFactory,
}
impl Debug for WindowUDF {
@@ -86,7 +88,7 @@ impl WindowUDF {
partition_evaluator_factory: &PartitionEvaluatorFactory,
) -> Self {
Self {
- name: name.to_owned(),
+ name: name.to_string(),
signature: signature.clone(),
return_type: return_type.clone(),
partition_evaluator_factory: partition_evaluator_factory.clone(),
@@ -115,4 +117,26 @@ impl WindowUDF {
window_frame,
})
}
+
+ /// Returns this function's name
+ pub fn name(&self) -> &str {
+ &self.name
+ }
+
+ /// Returns this function's signature (what input types are accepted)
+ pub fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ /// Return the type of the function given its input types
+ pub fn return_type(&self, args: &[DataType]) -> Result<DataType> {
+ // Old API returns an Arc of the datatype for some reason
+ let res = (self.return_type)(args)?;
+ Ok(res.as_ref().clone())
+ }
+
+ /// Return a `PartitionEvaluator` for evaluating this window function
+ pub fn partition_evaluator_factory(&self) -> Result<Box<dyn
PartitionEvaluator>> {
+ (self.partition_evaluator_factory)()
+ }
}
diff --git a/datafusion/expr/src/window_function.rs
b/datafusion/expr/src/window_function.rs
index 463cceafeb..35b7bded70 100644
--- a/datafusion/expr/src/window_function.rs
+++ b/datafusion/expr/src/window_function.rs
@@ -171,12 +171,8 @@ impl WindowFunction {
WindowFunction::BuiltInWindowFunction(fun) => {
fun.return_type(input_expr_types)
}
- WindowFunction::AggregateUDF(fun) => {
- Ok((*(fun.return_type)(input_expr_types)?).clone())
- }
- WindowFunction::WindowUDF(fun) => {
- Ok((*(fun.return_type)(input_expr_types)?).clone())
- }
+ WindowFunction::AggregateUDF(fun) =>
fun.return_type(input_expr_types),
+ WindowFunction::WindowUDF(fun) =>
fun.return_type(input_expr_types),
}
}
}
@@ -234,8 +230,8 @@ impl WindowFunction {
match self {
WindowFunction::AggregateFunction(fun) => fun.signature(),
WindowFunction::BuiltInWindowFunction(fun) => fun.signature(),
- WindowFunction::AggregateUDF(fun) => fun.signature.clone(),
- WindowFunction::WindowUDF(fun) => fun.signature.clone(),
+ WindowFunction::AggregateUDF(fun) => fun.signature().clone(),
+ WindowFunction::WindowUDF(fun) => fun.signature().clone(),
}
}
}
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index bfdbec3901..57dabbfee4 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -323,7 +323,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
let new_expr = coerce_arguments_for_signature(
args.as_slice(),
&self.schema,
- &fun.signature,
+ fun.signature(),
)?;
Ok(Expr::ScalarUDF(ScalarUDF::new(fun, new_expr)))
}
@@ -364,7 +364,7 @@ impl TreeNodeRewriter for TypeCoercionRewriter {
let new_expr = coerce_arguments_for_signature(
args.as_slice(),
&self.schema,
- &fun.signature,
+ fun.signature(),
)?;
let expr = Expr::AggregateUDF(expr::AggregateUDF::new(
fun, new_expr, filter, order_by,
diff --git a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
index c5a1aacce7..947a6f6070 100644
--- a/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
+++ b/datafusion/optimizer/src/simplify_expressions/expr_simplifier.rs
@@ -349,7 +349,7 @@ impl<'a> ConstEvaluator<'a> {
Self::volatility_ok(fun.volatility())
}
Expr::ScalarUDF(expr::ScalarUDF { fun, .. }) => {
- Self::volatility_ok(fun.signature.volatility)
+ Self::volatility_ok(fun.signature().volatility)
}
Expr::Literal(_)
| Expr::BinaryExpr { .. }
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 799127c95c..543d7eb654 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -78,7 +78,7 @@ pub fn create_physical_expr(
&format!("{fun}"),
fun_expr,
input_phy_exprs.to_vec(),
- &data_type,
+ data_type,
monotonicity,
)))
}
diff --git a/datafusion/physical-expr/src/scalar_function.rs
b/datafusion/physical-expr/src/scalar_function.rs
index 63101c03bc..0a9d69720e 100644
--- a/datafusion/physical-expr/src/scalar_function.rs
+++ b/datafusion/physical-expr/src/scalar_function.rs
@@ -77,14 +77,14 @@ impl ScalarFunctionExpr {
name: &str,
fun: ScalarFunctionImplementation,
args: Vec<Arc<dyn PhysicalExpr>>,
- return_type: &DataType,
+ return_type: DataType,
monotonicity: Option<FuncMonotonicity>,
) -> Self {
Self {
fun,
name: name.to_owned(),
args,
- return_type: return_type.clone(),
+ return_type,
monotonicity,
}
}
@@ -173,7 +173,7 @@ impl PhysicalExpr for ScalarFunctionExpr {
&self.name,
self.fun.clone(),
children,
- self.return_type(),
+ self.return_type().clone(),
self.monotonicity.clone(),
)))
}
diff --git a/datafusion/physical-expr/src/udf.rs
b/datafusion/physical-expr/src/udf.rs
index af1e77cbf5..0ec1cf3f25 100644
--- a/datafusion/physical-expr/src/udf.rs
+++ b/datafusion/physical-expr/src/udf.rs
@@ -35,10 +35,10 @@ pub fn create_physical_expr(
.collect::<Result<Vec<_>>>()?;
Ok(Arc::new(ScalarFunctionExpr::new(
- &fun.name,
- fun.fun.clone(),
+ fun.name(),
+ fun.fun().clone(),
input_phy_exprs.to_vec(),
- (fun.return_type)(&input_exprs_types)?.as_ref(),
+ fun.return_type(&input_exprs_types)?,
None,
)))
}
diff --git a/datafusion/physical-plan/src/udaf.rs
b/datafusion/physical-plan/src/udaf.rs
index 7cc3cc7d59..94017efe97 100644
--- a/datafusion/physical-plan/src/udaf.rs
+++ b/datafusion/physical-plan/src/udaf.rs
@@ -50,7 +50,7 @@ pub fn create_aggregate_expr(
Ok(Arc::new(AggregateFunctionExpr {
fun: fun.clone(),
args: input_phy_exprs.to_vec(),
- data_type: (fun.return_type)(&input_exprs_types)?.as_ref().clone(),
+ data_type: fun.return_type(&input_exprs_types)?,
name: name.into(),
}))
}
@@ -83,7 +83,9 @@ impl AggregateExpr for AggregateFunctionExpr {
}
fn state_fields(&self) -> Result<Vec<Field>> {
- let fields = (self.fun.state_type)(&self.data_type)?
+ let fields = self
+ .fun
+ .state_type(&self.data_type)?
.iter()
.enumerate()
.map(|(i, data_type)| {
@@ -103,11 +105,11 @@ impl AggregateExpr for AggregateFunctionExpr {
}
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- (self.fun.accumulator)(&self.data_type)
+ self.fun.accumulator(&self.data_type)
}
fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
- let accumulator = (self.fun.accumulator)(&self.data_type)?;
+ let accumulator = self.fun.accumulator(&self.data_type)?;
// Accumulators that have window frame startings different
// than `UNBOUNDED PRECEDING`, such as `1 PRECEEDING`, need to
diff --git a/datafusion/physical-plan/src/windows/mod.rs
b/datafusion/physical-plan/src/windows/mod.rs
index b6ed6e482f..541192c00d 100644
--- a/datafusion/physical-plan/src/windows/mod.rs
+++ b/datafusion/physical-plan/src/windows/mod.rs
@@ -255,7 +255,7 @@ fn create_udwf_window_expr(
.collect::<Result<_>>()?;
// figure out the output type
- let data_type = (fun.return_type)(&input_types)?;
+ let data_type = fun.return_type(&input_types)?;
Ok(Arc::new(WindowUDFExpr {
fun: Arc::clone(fun),
args: args.to_vec(),
@@ -272,7 +272,7 @@ struct WindowUDFExpr {
/// Display name
name: String,
/// result type
- data_type: Arc<DataType>,
+ data_type: DataType,
}
impl BuiltInWindowFunctionExpr for WindowUDFExpr {
@@ -282,11 +282,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
fn field(&self) -> Result<Field> {
let nullable = true;
- Ok(Field::new(
- &self.name,
- self.data_type.as_ref().clone(),
- nullable,
- ))
+ Ok(Field::new(&self.name, self.data_type.clone(), nullable))
}
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
@@ -294,7 +290,7 @@ impl BuiltInWindowFunctionExpr for WindowUDFExpr {
}
fn create_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
- (self.fun.partition_evaluator_factory)()
+ self.fun.partition_evaluator_factory()
}
fn name(&self) -> &str {
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index 491b7f6664..144f285310 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -613,12 +613,12 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}
WindowFunction::AggregateUDF(aggr_udf) => {
protobuf::window_expr_node::WindowFunction::Udaf(
- aggr_udf.name.clone(),
+ aggr_udf.name().to_string(),
)
}
WindowFunction::WindowUDF(window_udf) => {
protobuf::window_expr_node::WindowFunction::Udwf(
- window_udf.name.clone(),
+ window_udf.name().to_string(),
)
}
};
@@ -769,7 +769,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}
Expr::ScalarUDF(ScalarUDF { fun, args }) => Self {
expr_type:
Some(ExprType::ScalarUdfExpr(protobuf::ScalarUdfExprNode {
- fun_name: fun.name.clone(),
+ fun_name: fun.name().to_string(),
args: args
.iter()
.map(|expr| expr.try_into())
@@ -784,7 +784,7 @@ impl TryFrom<&Expr> for protobuf::LogicalExprNode {
}) => Self {
expr_type: Some(ExprType::AggregateUdfExpr(Box::new(
protobuf::AggregateUdfExprNode {
- fun_name: fun.name.clone(),
+ fun_name: fun.name().to_string(),
args: args.iter().map(|expr|
expr.try_into()).collect::<Result<
Vec<_>,
Error,
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs
b/datafusion/proto/src/physical_plan/from_proto.rs
index a628523f0e..22b74db9af 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -18,7 +18,6 @@
//! Serde code to convert from protocol buffers to Rust data structures.
use std::convert::{TryFrom, TryInto};
-use std::ops::Deref;
use std::sync::Arc;
use arrow::compute::SortOptions;
@@ -314,12 +313,12 @@ pub fn parse_physical_expr(
&e.name,
fun_expr,
args,
- &convert_required!(e.return_type)?,
+ convert_required!(e.return_type)?,
None,
))
}
ExprType::ScalarUdf(e) => {
- let scalar_fun =
registry.udf(e.name.as_str())?.deref().clone().fun;
+ let scalar_fun = registry.udf(e.name.as_str())?.fun().clone();
let args = e
.args
@@ -331,7 +330,7 @@ pub fn parse_physical_expr(
e.name.as_str(),
scalar_fun,
args,
- &convert_required!(e.return_type)?,
+ convert_required!(e.return_type)?,
None,
))
}
diff --git a/datafusion/proto/src/physical_plan/to_proto.rs
b/datafusion/proto/src/physical_plan/to_proto.rs
index 8201ef86b5..b8a590b0dc 100644
--- a/datafusion/proto/src/physical_plan/to_proto.rs
+++ b/datafusion/proto/src/physical_plan/to_proto.rs
@@ -84,10 +84,11 @@ impl TryFrom<Arc<dyn AggregateExpr>> for
protobuf::PhysicalExprNode {
.collect::<Result<Vec<_>>>()?;
if let Some(a) = a.as_any().downcast_ref::<AggregateFunctionExpr>() {
+ let name = a.fun().name().to_string();
return Ok(protobuf::PhysicalExprNode {
expr_type:
Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
protobuf::PhysicalAggregateExprNode {
- aggregate_function:
Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(a.fun().name.clone())),
+ aggregate_function:
Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
expr: expressions,
ordering_req,
distinct: false,
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index cc76e8a19e..75af9d2e0a 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1559,12 +1559,12 @@ fn roundtrip_window() {
Ok(Box::new(DummyWindow {}))
}
- let dummy_window_udf = WindowUDF {
- name: String::from("dummy_udwf"),
- signature: Signature::exact(vec![DataType::Float64],
Volatility::Immutable),
- return_type: Arc::new(return_type),
- partition_evaluator_factory: Arc::new(make_partition_evaluator),
- };
+ let dummy_window_udf = WindowUDF::new(
+ "dummy_udwf",
+ &Signature::exact(vec![DataType::Float64], Volatility::Immutable),
+ &(Arc::new(return_type) as _),
+ &(Arc::new(make_partition_evaluator) as _),
+ );
let test_expr6 = Expr::WindowFunction(expr::WindowFunction::new(
WindowFunction::WindowUDF(Arc::new(dummy_window_udf.clone())),
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 81e66d5ead..076ca41581 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -522,7 +522,7 @@ fn roundtrip_builtin_scalar_function() -> Result<()> {
"acos",
fun_expr,
vec![col("a", &schema)?],
- &DataType::Int64,
+ DataType::Int64,
None,
);
@@ -556,7 +556,7 @@ fn roundtrip_scalar_udf() -> Result<()> {
"dummy",
scalar_fn,
vec![col("a", &schema)?],
- &DataType::Int64,
+ DataType::Int64,
None,
);