This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bece785174 move Floor, Gcd, Lcm, Pi to datafusion-functions (#9976)
bece785174 is described below
commit bece785174c199f4fde4343a27c2213fae11bfb8
Author: Bruce Ritchie <[email protected]>
AuthorDate: Mon Apr 8 12:03:00 2024 -0400
move Floor, Gcd, Lcm, Pi to datafusion-functions (#9976)
* move Floor, Gcd, Lcm, Pi to datafusion-functions
---
datafusion/expr/src/built_in_function.rs | 29 +----
datafusion/expr/src/expr_fn.rs | 17 +--
datafusion/functions/src/math/gcd.rs | 145 +++++++++++++++++++++
datafusion/functions/src/math/lcm.rs | 126 ++++++++++++++++++
datafusion/functions/src/math/mod.rs | 14 +-
datafusion/functions/src/math/pi.rs | 76 +++++++++++
datafusion/optimizer/src/analyzer/type_coercion.rs | 30 +++--
.../physical-expr/src/equivalence/ordering.rs | 44 ++++---
.../physical-expr/src/equivalence/projection.rs | 41 +++---
.../physical-expr/src/equivalence/properties.rs | 41 +++---
datafusion/physical-expr/src/functions.rs | 10 +-
datafusion/physical-expr/src/math_expressions.rs | 118 -----------------
datafusion/physical-expr/src/udf.rs | 55 ++------
datafusion/physical-expr/src/utils/mod.rs | 99 +++++++++++++-
datafusion/proto/proto/datafusion.proto | 8 +-
datafusion/proto/src/generated/pbjson.rs | 12 --
datafusion/proto/src/generated/prost.rs | 16 +--
datafusion/proto/src/logical_plan/from_proto.rs | 20 +--
datafusion/proto/src/logical_plan/to_proto.rs | 4 -
datafusion/sql/src/expr/function.rs | 20 ++-
datafusion/sql/src/expr/mod.rs | 7 +-
21 files changed, 588 insertions(+), 344 deletions(-)
diff --git a/datafusion/expr/src/built_in_function.rs
b/datafusion/expr/src/built_in_function.rs
index dc1fc98a5c..7426ccd938 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -45,20 +45,12 @@ pub enum BuiltinScalarFunction {
Exp,
/// factorial
Factorial,
- /// floor
- Floor,
- /// gcd, Greatest common divisor
- Gcd,
- /// lcm, Least common multiple
- Lcm,
/// iszero
Iszero,
/// log, same as log10
Log,
/// nanvl
Nanvl,
- /// pi
- Pi,
/// power
Power,
/// round
@@ -135,13 +127,9 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Coalesce => Volatility::Immutable,
BuiltinScalarFunction::Exp => Volatility::Immutable,
BuiltinScalarFunction::Factorial => Volatility::Immutable,
- BuiltinScalarFunction::Floor => Volatility::Immutable,
- BuiltinScalarFunction::Gcd => Volatility::Immutable,
BuiltinScalarFunction::Iszero => Volatility::Immutable,
- BuiltinScalarFunction::Lcm => Volatility::Immutable,
BuiltinScalarFunction::Log => Volatility::Immutable,
BuiltinScalarFunction::Nanvl => Volatility::Immutable,
- BuiltinScalarFunction::Pi => Volatility::Immutable,
BuiltinScalarFunction::Power => Volatility::Immutable,
BuiltinScalarFunction::Round => Volatility::Immutable,
BuiltinScalarFunction::Cot => Volatility::Immutable,
@@ -183,13 +171,10 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::InitCap => {
utf8_to_str_type(&input_expr_types[0], "initcap")
}
- BuiltinScalarFunction::Pi => Ok(Float64),
BuiltinScalarFunction::Random => Ok(Float64),
BuiltinScalarFunction::EndsWith => Ok(Boolean),
- BuiltinScalarFunction::Factorial
- | BuiltinScalarFunction::Gcd
- | BuiltinScalarFunction::Lcm => Ok(Int64),
+ BuiltinScalarFunction::Factorial => Ok(Int64),
BuiltinScalarFunction::Power => match &input_expr_types[0] {
Int64 => Ok(Int64),
@@ -210,7 +195,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Ceil
| BuiltinScalarFunction::Exp
- | BuiltinScalarFunction::Floor
| BuiltinScalarFunction::Round
| BuiltinScalarFunction::Trunc
| BuiltinScalarFunction::Cot => match input_expr_types[0] {
@@ -248,7 +232,6 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
- BuiltinScalarFunction::Pi => Signature::exact(vec![],
self.volatility()),
BuiltinScalarFunction::Random => Signature::exact(vec![],
self.volatility()),
BuiltinScalarFunction::Power => Signature::one_of(
vec![Exact(vec![Int64, Int64]), Exact(vec![Float64, Float64])],
@@ -289,12 +272,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Factorial => {
Signature::uniform(1, vec![Int64], self.volatility())
}
- BuiltinScalarFunction::Gcd | BuiltinScalarFunction::Lcm => {
- Signature::uniform(2, vec![Int64], self.volatility())
- }
BuiltinScalarFunction::Ceil
| BuiltinScalarFunction::Exp
- | BuiltinScalarFunction::Floor
| BuiltinScalarFunction::Cot => {
// math expressions expect 1 argument of type f64 or f32
// priority is given to f64 because e.g. `sqrt(1i32)` is in IR
(real numbers) and thus we
@@ -319,10 +298,8 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Ceil
| BuiltinScalarFunction::Exp
| BuiltinScalarFunction::Factorial
- | BuiltinScalarFunction::Floor
| BuiltinScalarFunction::Round
| BuiltinScalarFunction::Trunc
- | BuiltinScalarFunction::Pi
) {
Some(vec![Some(true)])
} else if *self == BuiltinScalarFunction::Log {
@@ -339,13 +316,9 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Cot => &["cot"],
BuiltinScalarFunction::Exp => &["exp"],
BuiltinScalarFunction::Factorial => &["factorial"],
- BuiltinScalarFunction::Floor => &["floor"],
- BuiltinScalarFunction::Gcd => &["gcd"],
BuiltinScalarFunction::Iszero => &["iszero"],
- BuiltinScalarFunction::Lcm => &["lcm"],
BuiltinScalarFunction::Log => &["log"],
BuiltinScalarFunction::Nanvl => &["nanvl"],
- BuiltinScalarFunction::Pi => &["pi"],
BuiltinScalarFunction::Power => &["power", "pow"],
BuiltinScalarFunction::Random => &["random"],
BuiltinScalarFunction::Round => &["round"],
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index f68685a87f..6c811ff064 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -297,11 +297,6 @@ pub fn concat_ws(sep: Expr, values: Vec<Expr>) -> Expr {
))
}
-/// Returns an approximate value of π
-pub fn pi() -> Expr {
- Expr::ScalarFunction(ScalarFunction::new(BuiltinScalarFunction::Pi,
vec![]))
-}
-
/// Returns a random value in the range 0.0 <= x < 1.0
pub fn random() -> Expr {
Expr::ScalarFunction(ScalarFunction::new(BuiltinScalarFunction::Random,
vec![]))
@@ -537,12 +532,6 @@ macro_rules! nary_scalar_expr {
// math functions
scalar_expr!(Cot, cot, num, "cotangent of a number");
scalar_expr!(Factorial, factorial, num, "factorial");
-scalar_expr!(
- Floor,
- floor,
- num,
- "nearest integer less than or equal to argument"
-);
scalar_expr!(
Ceil,
ceil,
@@ -556,8 +545,7 @@ nary_scalar_expr!(
"truncate toward zero, with optional precision"
);
scalar_expr!(Exp, exp, num, "exponential");
-scalar_expr!(Gcd, gcd, arg_1 arg_2, "greatest common divisor");
-scalar_expr!(Lcm, lcm, arg_1 arg_2, "least common multiple");
+
scalar_expr!(Power, power, base exponent, "`base` raised to the power of
`exponent`");
scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");
@@ -974,7 +962,6 @@ mod test {
fn scalar_function_definitions() {
test_unary_scalar_expr!(Cot, cot);
test_unary_scalar_expr!(Factorial, factorial);
- test_unary_scalar_expr!(Floor, floor);
test_unary_scalar_expr!(Ceil, ceil);
test_nary_scalar_expr!(Round, round, input);
test_nary_scalar_expr!(Round, round, input, decimal_places);
@@ -984,8 +971,6 @@ mod test {
test_scalar_expr!(Nanvl, nanvl, x, y);
test_scalar_expr!(Iszero, iszero, input);
- test_scalar_expr!(Gcd, gcd, arg_1, arg_2);
- test_scalar_expr!(Lcm, lcm, arg_1, arg_2);
test_scalar_expr!(InitCap, initcap, string);
test_scalar_expr!(EndsWith, ends_with, string, characters);
}
diff --git a/datafusion/functions/src/math/gcd.rs
b/datafusion/functions/src/math/gcd.rs
new file mode 100644
index 0000000000..41c9e4e233
--- /dev/null
+++ b/datafusion/functions/src/math/gcd.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int64Array};
+use std::any::Any;
+use std::mem::swap;
+use std::sync::Arc;
+
+use arrow::datatypes::DataType;
+use arrow::datatypes::DataType::Int64;
+
+use crate::utils::make_scalar_function;
+use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+
+#[derive(Debug)]
+pub struct GcdFunc {
+ signature: Signature,
+}
+
+impl Default for GcdFunc {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl GcdFunc {
+ pub fn new() -> Self {
+ use DataType::*;
+ Self {
+ signature: Signature::uniform(2, vec![Int64],
Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for GcdFunc {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "gcd"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(Int64)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_scalar_function(gcd, vec![])(args)
+ }
+}
+
+/// Gcd SQL function
+fn gcd(args: &[ArrayRef]) -> Result<ArrayRef> {
+ match args[0].data_type() {
+ Int64 => Ok(Arc::new(make_function_inputs2!(
+ &args[0],
+ &args[1],
+ "x",
+ "y",
+ Int64Array,
+ Int64Array,
+ { compute_gcd }
+ )) as ArrayRef),
+ other => exec_err!("Unsupported data type {other:?} for function gcd"),
+ }
+}
+
+/// Computes greatest common divisor using Binary GCD algorithm.
+pub fn compute_gcd(x: i64, y: i64) -> i64 {
+ let mut a = x.wrapping_abs();
+ let mut b = y.wrapping_abs();
+
+ if a == 0 {
+ return b;
+ }
+ if b == 0 {
+ return a;
+ }
+
+ let shift = (a | b).trailing_zeros();
+ a >>= shift;
+ b >>= shift;
+ a >>= a.trailing_zeros();
+
+ loop {
+ b >>= b.trailing_zeros();
+ if a > b {
+ swap(&mut a, &mut b);
+ }
+
+ b -= a;
+
+ if b == 0 {
+ return a << shift;
+ }
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::sync::Arc;
+
+ use arrow::array::{ArrayRef, Int64Array};
+
+ use crate::math::gcd::gcd;
+ use datafusion_common::cast::as_int64_array;
+
+ #[test]
+ fn test_gcd_i64() {
+ let args: Vec<ArrayRef> = vec![
+ Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
+ Arc::new(Int64Array::from(vec![0, -2, 15, 8])), // y
+ ];
+
+ let result = gcd(&args).expect("failed to initialize function gcd");
+ let ints = as_int64_array(&result).expect("failed to initialize
function gcd");
+
+ assert_eq!(ints.len(), 4);
+ assert_eq!(ints.value(0), 0);
+ assert_eq!(ints.value(1), 1);
+ assert_eq!(ints.value(2), 5);
+ assert_eq!(ints.value(3), 8);
+ }
+}
diff --git a/datafusion/functions/src/math/lcm.rs
b/datafusion/functions/src/math/lcm.rs
new file mode 100644
index 0000000000..3674f7371d
--- /dev/null
+++ b/datafusion/functions/src/math/lcm.rs
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array};
+use arrow::datatypes::DataType;
+use arrow::datatypes::DataType::Int64;
+
+use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+
+use crate::math::gcd::compute_gcd;
+use crate::utils::make_scalar_function;
+
+#[derive(Debug)]
+pub struct LcmFunc {
+ signature: Signature,
+}
+
+impl Default for LcmFunc {
+ fn default() -> Self {
+ LcmFunc::new()
+ }
+}
+
+impl LcmFunc {
+ pub fn new() -> Self {
+ use DataType::*;
+ Self {
+ signature: Signature::uniform(2, vec![Int64],
Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for LcmFunc {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "lcm"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(Int64)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ make_scalar_function(lcm, vec![])(args)
+ }
+}
+
+/// Lcm SQL function
+fn lcm(args: &[ArrayRef]) -> Result<ArrayRef> {
+ let compute_lcm = |x: i64, y: i64| {
+ let a = x.wrapping_abs();
+ let b = y.wrapping_abs();
+
+ if a == 0 || b == 0 {
+ return 0;
+ }
+ a / compute_gcd(a, b) * b
+ };
+
+ match args[0].data_type() {
+ Int64 => Ok(Arc::new(make_function_inputs2!(
+ &args[0],
+ &args[1],
+ "x",
+ "y",
+ Int64Array,
+ Int64Array,
+ { compute_lcm }
+ )) as ArrayRef),
+ other => exec_err!("Unsupported data type {other:?} for function lcm"),
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use std::sync::Arc;
+
+ use arrow::array::{ArrayRef, Int64Array};
+
+ use datafusion_common::cast::as_int64_array;
+
+ use crate::math::lcm::lcm;
+
+ #[test]
+ fn test_lcm_i64() {
+ let args: Vec<ArrayRef> = vec![
+ Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
+ Arc::new(Int64Array::from(vec![0, -2, 15, 8])), // y
+ ];
+
+ let result = lcm(&args).expect("failed to initialize function lcm");
+ let ints = as_int64_array(&result).expect("failed to initialize
function lcm");
+
+ assert_eq!(ints.len(), 4);
+ assert_eq!(ints.value(0), 0);
+ assert_eq!(ints.value(1), 6);
+ assert_eq!(ints.value(2), 75);
+ assert_eq!(ints.value(3), 16);
+ }
+}
diff --git a/datafusion/functions/src/math/mod.rs
b/datafusion/functions/src/math/mod.rs
index f241c8b325..3a1f7cc13b 100644
--- a/datafusion/functions/src/math/mod.rs
+++ b/datafusion/functions/src/math/mod.rs
@@ -18,11 +18,17 @@
//! "math" DataFusion functions
pub mod abs;
+pub mod gcd;
+pub mod lcm;
pub mod nans;
+pub mod pi;
// Create UDFs
make_udf_function!(nans::IsNanFunc, ISNAN, isnan);
make_udf_function!(abs::AbsFunc, ABS, abs);
+make_udf_function!(gcd::GcdFunc, GCD, gcd);
+make_udf_function!(lcm::LcmFunc, LCM, lcm);
+make_udf_function!(pi::PiFunc, PI, pi);
make_math_unary_udf!(Log2Func, LOG2, log2, log2, Some(vec![Some(true)]));
make_math_unary_udf!(Log10Func, LOG10, log10, log10, Some(vec![Some(true)]));
@@ -50,6 +56,8 @@ make_math_unary_udf!(CosFunc, COS, cos, cos, None);
make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, None);
make_math_unary_udf!(DegreesFunc, DEGREES, degrees, to_degrees, None);
+make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, Some(vec![Some(true)]));
+
// Export the functions out of this package, both as expr_fn as well as a list
of functions
export_functions!(
(
@@ -86,5 +94,9 @@ export_functions!(
(cbrt, num, "cube root of a number"),
(cos, num, "cosine"),
(cosh, num, "hyperbolic cosine"),
- (degrees, num, "converts radians to degrees")
+ (degrees, num, "converts radians to degrees"),
+ (gcd, x y, "greatest common divisor"),
+ (lcm, x y, "least common multiple"),
+ (floor, num, "nearest integer less than or equal to argument"),
+ (pi, , "Returns an approximate value of π")
);
diff --git a/datafusion/functions/src/math/pi.rs
b/datafusion/functions/src/math/pi.rs
new file mode 100644
index 0000000000..0801e79751
--- /dev/null
+++ b/datafusion/functions/src/math/pi.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::Float64Array;
+use arrow::datatypes::DataType;
+use arrow::datatypes::DataType::Float64;
+
+use datafusion_common::{exec_err, Result};
+use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility};
+use datafusion_expr::{ScalarUDFImpl, Signature};
+
+#[derive(Debug)]
+pub struct PiFunc {
+ signature: Signature,
+}
+
+impl Default for PiFunc {
+ fn default() -> Self {
+ PiFunc::new()
+ }
+}
+
+impl PiFunc {
+ pub fn new() -> Self {
+ Self {
+ signature: Signature::exact(vec![], Volatility::Immutable),
+ }
+ }
+}
+
+impl ScalarUDFImpl for PiFunc {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "pi"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ Ok(Float64)
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ if !matches!(&args[0], ColumnarValue::Array(_)) {
+ return exec_err!("Expect pi function to take no param");
+ }
+ let array = Float64Array::from_value(std::f64::consts::PI, 1);
+ Ok(ColumnarValue::Array(Arc::new(array)))
+ }
+
+ fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
+ Ok(Some(vec![Some(true)]))
+ }
+}
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 04de243fba..1ea8b9534e 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -19,9 +19,8 @@
use std::sync::Arc;
-use crate::analyzer::AnalyzerRule;
-
use arrow::datatypes::{DataType, IntervalUnit};
+
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
use datafusion_common::{
@@ -51,6 +50,8 @@ use datafusion_expr::{
WindowFrameUnits,
};
+use crate::analyzer::AnalyzerRule;
+
#[derive(Default)]
pub struct TypeCoercion {}
@@ -758,25 +759,25 @@ mod test {
use std::any::Any;
use std::sync::{Arc, OnceLock};
- use crate::analyzer::type_coercion::{
- coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
- };
- use crate::test::assert_analyzed_plan_eq;
-
use arrow::datatypes::{DataType, Field, TimeUnit};
+
use datafusion_common::tree_node::{TransformedResult, TreeNode};
use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
use datafusion_expr::logical_plan::{EmptyRelation, Projection};
use datafusion_expr::{
cast, col, concat, concat_ws, create_udaf, is_true, lit,
- AccumulatorFactoryFunction, AggregateFunction, AggregateUDF,
BinaryExpr,
- BuiltinScalarFunction, Case, ColumnarValue, Expr, ExprSchemable,
Filter,
- LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl, Signature,
SimpleAggregateUDF,
- Subquery, Volatility,
+ AccumulatorFactoryFunction, AggregateFunction, AggregateUDF,
BinaryExpr, Case,
+ ColumnarValue, Expr, ExprSchemable, Filter, LogicalPlan, Operator,
ScalarUDF,
+ ScalarUDFImpl, Signature, SimpleAggregateUDF, Subquery, Volatility,
};
use datafusion_physical_expr::expressions::AvgAccumulator;
+ use crate::analyzer::type_coercion::{
+ coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
+ };
+ use crate::test::assert_analyzed_plan_eq;
+
fn empty() -> Arc<LogicalPlan> {
Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
@@ -875,14 +876,15 @@ mod test {
// test that automatic argument type coercion for scalar functions work
let empty = empty();
let lit_expr = lit(10i64);
- let fun: BuiltinScalarFunction = BuiltinScalarFunction::Floor;
+ let fun = ScalarUDF::new_from_impl(TestScalarUDF {});
let scalar_function_expr =
- Expr::ScalarFunction(ScalarFunction::new(fun, vec![lit_expr]));
+ Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun),
vec![lit_expr]));
let plan = LogicalPlan::Projection(Projection::try_new(
vec![scalar_function_expr],
empty,
)?);
- let expected = "Projection: floor(CAST(Int64(10) AS Float64))\n
EmptyRelation";
+ let expected =
+ "Projection: TestScalarUDF(CAST(Int64(10) AS Float32))\n
EmptyRelation";
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, expected)
}
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 1364d3a8c0..688cdf798b 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use arrow_schema::SortOptions;
use std::hash::Hash;
use std::sync::Arc;
+use arrow_schema::SortOptions;
+
use crate::equivalence::add_offset_to_expr;
use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
@@ -220,6 +221,16 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx:
usize, pre_idx: usize) ->
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::SortOptions;
+ use itertools::Itertools;
+
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::execution_props::ExecutionProps;
+ use datafusion_expr::{BuiltinScalarFunction, Operator, ScalarUDF};
+
use crate::equivalence::tests::{
convert_to_orderings, convert_to_sort_exprs, create_random_schema,
create_test_params, generate_table_for_eq_properties,
is_table_same_after_sort,
@@ -231,14 +242,8 @@ mod tests {
use crate::expressions::Column;
use crate::expressions::{col, BinaryExpr};
use crate::functions::create_physical_expr;
+ use crate::utils::tests::TestScalarUDF;
use crate::{PhysicalExpr, PhysicalSortExpr};
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow_schema::SortOptions;
- use datafusion_common::Result;
- use datafusion_expr::execution_props::ExecutionProps;
- use datafusion_expr::{BuiltinScalarFunction, Operator};
- use itertools::Itertools;
- use std::sync::Arc;
#[test]
fn test_ordering_satisfy() -> Result<()> {
@@ -281,17 +286,20 @@ mod tests {
let col_d = &col("d", &test_schema)?;
let col_e = &col("e", &test_schema)?;
let col_f = &col("f", &test_schema)?;
- let floor_a = &create_physical_expr(
- &BuiltinScalarFunction::Floor,
+ let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+ let floor_a = &crate::udf::create_physical_expr(
+ &test_fun,
&[col("a", &test_schema)?],
&test_schema,
- &ExecutionProps::default(),
+ &[],
+ &DFSchema::empty(),
)?;
- let floor_f = &create_physical_expr(
- &BuiltinScalarFunction::Floor,
+ let floor_f = &crate::udf::create_physical_expr(
+ &test_fun,
&[col("f", &test_schema)?],
&test_schema,
- &ExecutionProps::default(),
+ &[],
+ &DFSchema::empty(),
)?;
let exp_a = &create_physical_expr(
&BuiltinScalarFunction::Exp,
@@ -804,11 +812,13 @@ mod tests {
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS,
N_DISTINCT)?;
- let floor_a = create_physical_expr(
- &BuiltinScalarFunction::Floor,
+ let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+ let floor_a = crate::udf::create_physical_expr(
+ &test_fun,
&[col("a", &test_schema)?],
&test_schema,
- &ExecutionProps::default(),
+ &[],
+ &DFSchema::empty(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs
b/datafusion/physical-expr/src/equivalence/projection.rs
index b8231a74c2..5efcf5942c 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -17,13 +17,14 @@
use std::sync::Arc;
-use crate::expressions::Column;
-use crate::PhysicalExpr;
-
use arrow::datatypes::SchemaRef;
+
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
+use crate::expressions::Column;
+use crate::PhysicalExpr;
+
/// Stores the mapping between source expressions and target expressions for a
/// projection.
#[derive(Debug, Clone)]
@@ -111,7 +112,14 @@ impl ProjectionMapping {
mod tests {
use std::sync::Arc;
- use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::{SortOptions, TimeUnit};
+ use itertools::Itertools;
+
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::execution_props::ExecutionProps;
+ use datafusion_expr::{BuiltinScalarFunction, Operator, ScalarUDF};
+
use crate::equivalence::tests::{
apply_projection, convert_to_orderings, convert_to_orderings_owned,
create_random_schema, generate_table_for_eq_properties,
is_table_same_after_sort,
@@ -119,16 +127,11 @@ mod tests {
};
use crate::equivalence::EquivalenceProperties;
use crate::expressions::{col, BinaryExpr};
- use crate::functions::create_physical_expr;
+ use crate::udf::create_physical_expr;
+ use crate::utils::tests::TestScalarUDF;
use crate::PhysicalSortExpr;
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow_schema::{SortOptions, TimeUnit};
- use datafusion_common::Result;
- use datafusion_expr::execution_props::ExecutionProps;
- use datafusion_expr::{BuiltinScalarFunction, Operator};
-
- use itertools::Itertools;
+ use super::*;
#[test]
fn project_orderings() -> Result<()> {
@@ -646,7 +649,7 @@ mod tests {
col_b.clone(),
)) as Arc<dyn PhysicalExpr>;
- let round_c = &create_physical_expr(
+ let round_c = &crate::functions::create_physical_expr(
&BuiltinScalarFunction::Round,
&[col_c.clone()],
&schema,
@@ -973,11 +976,13 @@ mod tests {
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS,
N_DISTINCT)?;
// Floor(a)
+ let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
let floor_a = create_physical_expr(
- &BuiltinScalarFunction::Floor,
+ &test_fun,
&[col("a", &test_schema)?],
&test_schema,
- &ExecutionProps::default(),
+ &[],
+ &DFSchema::empty(),
)?;
// a + b
let a_plus_b = Arc::new(BinaryExpr::new(
@@ -1049,11 +1054,13 @@ mod tests {
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS,
N_DISTINCT)?;
// Floor(a)
+ let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
let floor_a = create_physical_expr(
- &BuiltinScalarFunction::Floor,
+ &test_fun,
&[col("a", &test_schema)?],
&test_schema,
- &ExecutionProps::default(),
+ &[],
+ &DFSchema::empty(),
)?;
// a + b
let a_plus_b = Arc::new(BinaryExpr::new(
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs
b/datafusion/physical-expr/src/equivalence/properties.rs
index 7ce540b267..c14c88d6c6 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -18,7 +18,13 @@
use std::hash::{Hash, Hasher};
use std::sync::Arc;
-use super::ordering::collapse_lex_ordering;
+use arrow_schema::{SchemaRef, SortOptions};
+use indexmap::{IndexMap, IndexSet};
+use itertools::Itertools;
+
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{JoinSide, JoinType, Result};
+
use crate::equivalence::{
collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass,
ProjectionMapping,
};
@@ -30,12 +36,7 @@ use crate::{
PhysicalSortRequirement,
};
-use arrow_schema::{SchemaRef, SortOptions};
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinSide, JoinType, Result};
-
-use indexmap::{IndexMap, IndexSet};
-use itertools::Itertools;
+use super::ordering::collapse_lex_ordering;
/// A `EquivalenceProperties` object stores useful information related to a
schema.
/// Currently, it keeps track of:
@@ -1296,7 +1297,13 @@ mod tests {
use std::ops::Not;
use std::sync::Arc;
- use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::{Fields, SortOptions, TimeUnit};
+ use itertools::Itertools;
+
+ use datafusion_common::{DFSchema, Result};
+ use datafusion_expr::{Operator, ScalarUDF};
+
use crate::equivalence::add_offset_to_expr;
use crate::equivalence::tests::{
convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
@@ -1304,16 +1311,10 @@ mod tests {
generate_table_for_eq_properties, is_table_same_after_sort,
output_schema,
};
use crate::expressions::{col, BinaryExpr, Column};
- use crate::functions::create_physical_expr;
+ use crate::utils::tests::TestScalarUDF;
use crate::PhysicalSortExpr;
- use arrow::datatypes::{DataType, Field, Schema};
- use arrow_schema::{Fields, SortOptions, TimeUnit};
- use datafusion_common::Result;
- use datafusion_expr::execution_props::ExecutionProps;
- use datafusion_expr::{BuiltinScalarFunction, Operator};
-
- use itertools::Itertools;
+ use super::*;
#[test]
fn project_equivalence_properties_test() -> Result<()> {
@@ -1792,11 +1793,13 @@ mod tests {
let table_data_with_properties =
generate_table_for_eq_properties(&eq_properties, N_ELEMENTS,
N_DISTINCT)?;
- let floor_a = create_physical_expr(
- &BuiltinScalarFunction::Floor,
+ let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+ let floor_a = crate::udf::create_physical_expr(
+ &test_fun,
&[col("a", &test_schema)?],
&test_schema,
- &ExecutionProps::default(),
+ &[],
+ &DFSchema::empty(),
)?;
let a_plus_b = Arc::new(BinaryExpr::new(
col("a", &test_schema)?,
diff --git a/datafusion/physical-expr/src/functions.rs
b/datafusion/physical-expr/src/functions.rs
index 770d918432..79d69b273d 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -184,16 +184,9 @@ pub fn create_physical_fun(
BuiltinScalarFunction::Factorial => {
Arc::new(|args|
make_scalar_function_inner(math_expressions::factorial)(args))
}
- BuiltinScalarFunction::Floor => Arc::new(math_expressions::floor),
- BuiltinScalarFunction::Gcd => {
- Arc::new(|args|
make_scalar_function_inner(math_expressions::gcd)(args))
- }
BuiltinScalarFunction::Iszero => {
Arc::new(|args|
make_scalar_function_inner(math_expressions::iszero)(args))
}
- BuiltinScalarFunction::Lcm => {
- Arc::new(|args|
make_scalar_function_inner(math_expressions::lcm)(args))
- }
BuiltinScalarFunction::Nanvl => {
Arc::new(|args|
make_scalar_function_inner(math_expressions::nanvl)(args))
}
@@ -204,7 +197,6 @@ pub fn create_physical_fun(
BuiltinScalarFunction::Trunc => {
Arc::new(|args|
make_scalar_function_inner(math_expressions::trunc)(args))
}
- BuiltinScalarFunction::Pi => Arc::new(math_expressions::pi),
BuiltinScalarFunction::Power => {
Arc::new(|args|
make_scalar_function_inner(math_expressions::power)(args))
}
@@ -573,7 +565,7 @@ mod tests {
let execution_props = ExecutionProps::new();
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
- let funs = [BuiltinScalarFunction::Pi, BuiltinScalarFunction::Random];
+ let funs = [BuiltinScalarFunction::Random];
for fun in funs.iter() {
create_physical_expr_with_type_coercion(fun, &[], &schema,
&execution_props)?;
diff --git a/datafusion/physical-expr/src/math_expressions.rs
b/datafusion/physical-expr/src/math_expressions.rs
index f8244ad952..384f8d87eb 100644
--- a/datafusion/physical-expr/src/math_expressions.rs
+++ b/datafusion/physical-expr/src/math_expressions.rs
@@ -19,7 +19,6 @@
use std::any::type_name;
use std::iter;
-use std::mem::swap;
use std::sync::Arc;
use arrow::array::ArrayRef;
@@ -161,7 +160,6 @@ math_unary_function!("atan", atan);
math_unary_function!("asinh", asinh);
math_unary_function!("acosh", acosh);
math_unary_function!("atanh", atanh);
-math_unary_function!("floor", floor);
math_unary_function!("ceil", ceil);
math_unary_function!("exp", exp);
math_unary_function!("ln", ln);
@@ -181,79 +179,6 @@ pub fn factorial(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}
-/// Computes greatest common divisor using Binary GCD algorithm.
-fn compute_gcd(x: i64, y: i64) -> i64 {
- let mut a = x.wrapping_abs();
- let mut b = y.wrapping_abs();
-
- if a == 0 {
- return b;
- }
- if b == 0 {
- return a;
- }
-
- let shift = (a | b).trailing_zeros();
- a >>= shift;
- b >>= shift;
- a >>= a.trailing_zeros();
-
- loop {
- b >>= b.trailing_zeros();
- if a > b {
- swap(&mut a, &mut b);
- }
-
- b -= a;
-
- if b == 0 {
- return a << shift;
- }
- }
-}
-
-/// Gcd SQL function
-pub fn gcd(args: &[ArrayRef]) -> Result<ArrayRef> {
- match args[0].data_type() {
- DataType::Int64 => Ok(Arc::new(make_function_inputs2!(
- &args[0],
- &args[1],
- "x",
- "y",
- Int64Array,
- Int64Array,
- { compute_gcd }
- )) as ArrayRef),
- other => exec_err!("Unsupported data type {other:?} for function gcd"),
- }
-}
-
-/// Lcm SQL function
-pub fn lcm(args: &[ArrayRef]) -> Result<ArrayRef> {
- let compute_lcm = |x: i64, y: i64| {
- let a = x.wrapping_abs();
- let b = y.wrapping_abs();
-
- if a == 0 || b == 0 {
- return 0;
- }
- a / compute_gcd(a, b) * b
- };
-
- match args[0].data_type() {
- DataType::Int64 => Ok(Arc::new(make_function_inputs2!(
- &args[0],
- &args[1],
- "x",
- "y",
- Int64Array,
- Int64Array,
- { compute_lcm }
- )) as ArrayRef),
- other => exec_err!("Unsupported data type {other:?} for function lcm"),
- }
-}
-
/// Nanvl SQL function
pub fn nanvl(args: &[ArrayRef]) -> Result<ArrayRef> {
match args[0].data_type() {
@@ -345,15 +270,6 @@ pub fn iszero(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}
-/// Pi SQL function
-pub fn pi(args: &[ColumnarValue]) -> Result<ColumnarValue> {
- if !matches!(&args[0], ColumnarValue::Array(_)) {
- return exec_err!("Expect pi function to take no param");
- }
- let array = Float64Array::from_value(std::f64::consts::PI, 1);
- Ok(ColumnarValue::Array(Arc::new(array)))
-}
-
/// Random SQL function
pub fn random(args: &[ColumnarValue]) -> Result<ColumnarValue> {
let len: usize = match &args[0] {
@@ -808,40 +724,6 @@ mod tests {
assert_eq!(ints, &expected);
}
- #[test]
- fn test_gcd_i64() {
- let args: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
- Arc::new(Int64Array::from(vec![0, -2, 15, 8])), // y
- ];
-
- let result = gcd(&args).expect("failed to initialize function gcd");
- let ints = as_int64_array(&result).expect("failed to initialize
function gcd");
-
- assert_eq!(ints.len(), 4);
- assert_eq!(ints.value(0), 0);
- assert_eq!(ints.value(1), 1);
- assert_eq!(ints.value(2), 5);
- assert_eq!(ints.value(3), 8);
- }
-
- #[test]
- fn test_lcm_i64() {
- let args: Vec<ArrayRef> = vec![
- Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
- Arc::new(Int64Array::from(vec![0, -2, 15, 8])), // y
- ];
-
- let result = lcm(&args).expect("failed to initialize function lcm");
- let ints = as_int64_array(&result).expect("failed to initialize
function lcm");
-
- assert_eq!(ints.len(), 4);
- assert_eq!(ints.value(0), 0);
- assert_eq!(ints.value(1), 6);
- assert_eq!(ints.value(2), 75);
- assert_eq!(ints.value(3), 16);
- }
-
#[test]
fn test_cot_f32() {
let args: Vec<ArrayRef> =
diff --git a/datafusion/physical-expr/src/udf.rs
b/datafusion/physical-expr/src/udf.rs
index 4fc94bfa15..368dfdf92f 100644
--- a/datafusion/physical-expr/src/udf.rs
+++ b/datafusion/physical-expr/src/udf.rs
@@ -16,14 +16,17 @@
// under the License.
//! UDF support
-use crate::{PhysicalExpr, ScalarFunctionExpr};
+use std::sync::Arc;
+
use arrow_schema::Schema;
+
use datafusion_common::{DFSchema, Result};
pub use datafusion_expr::ScalarUDF;
use datafusion_expr::{
type_coercion::functions::data_types, Expr, ScalarFunctionDefinition,
};
-use std::sync::Arc;
+
+use crate::{PhysicalExpr, ScalarFunctionExpr};
/// Create a physical expression of the UDF.
///
@@ -60,58 +63,18 @@ pub fn create_physical_expr(
#[cfg(test)]
mod tests {
- use arrow_schema::{DataType, Schema};
+ use arrow_schema::Schema;
+
use datafusion_common::{DFSchema, Result};
- use datafusion_expr::{
- ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature,
Volatility,
- };
+ use datafusion_expr::ScalarUDF;
+ use crate::utils::tests::TestScalarUDF;
use crate::ScalarFunctionExpr;
use super::create_physical_expr;
#[test]
fn test_functions() -> Result<()> {
- #[derive(Debug, Clone)]
- struct TestScalarUDF {
- signature: Signature,
- }
-
- impl TestScalarUDF {
- fn new() -> Self {
- let signature =
- Signature::exact(vec![DataType::Float64],
Volatility::Immutable);
-
- Self { signature }
- }
- }
-
- impl ScalarUDFImpl for TestScalarUDF {
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
-
- fn name(&self) -> &str {
- "my_fn"
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType>
{
- Ok(DataType::Float64)
- }
-
- fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue>
{
- unimplemented!("my_fn is not implemented")
- }
-
- fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
- Ok(Some(vec![Some(true)]))
- }
- }
-
// create and register the udf
let udf = ScalarUDF::from(TestScalarUDF::new());
diff --git a/datafusion/physical-expr/src/utils/mod.rs
b/datafusion/physical-expr/src/utils/mod.rs
index e55bc3d156..d7bebbff89 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -256,7 +256,9 @@ pub fn merge_vectors(
}
#[cfg(test)]
-mod tests {
+pub(crate) mod tests {
+ use arrow_array::{ArrayRef, Float32Array, Float64Array};
+ use std::any::Any;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
@@ -265,10 +267,103 @@ mod tests {
use crate::PhysicalSortExpr;
use arrow_schema::{DataType, Field, Schema};
- use datafusion_common::{Result, ScalarValue};
+ use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
+ use datafusion_expr::{
+ ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
+ };
use petgraph::visit::Bfs;
+ #[derive(Debug, Clone)]
+ pub struct TestScalarUDF {
+ signature: Signature,
+ }
+
+ impl TestScalarUDF {
+ pub fn new() -> Self {
+ use DataType::*;
+ Self {
+ signature: Signature::uniform(
+ 1,
+ vec![Float64, Float32],
+ Volatility::Immutable,
+ ),
+ }
+ }
+ }
+
+ impl ScalarUDFImpl for TestScalarUDF {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+ fn name(&self) -> &str {
+ "test-scalar-udf"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+ let arg_type = &arg_types[0];
+
+ match arg_type {
+ DataType::Float32 => Ok(DataType::Float32),
+ _ => Ok(DataType::Float64),
+ }
+ }
+
+ fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
+ Ok(Some(vec![Some(true)]))
+ }
+
+ fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+ let args = ColumnarValue::values_to_arrays(args)?;
+
+ let arr: ArrayRef = match args[0].data_type() {
+ DataType::Float64 => Arc::new({
+ let arg = &args[0]
+ .as_any()
+ .downcast_ref::<Float64Array>()
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "could not cast {} to {}",
+ self.name(),
+ std::any::type_name::<Float64Array>()
+ ))
+ })?;
+
+ arg.iter()
+ .map(|a| a.map(f64::floor))
+ .collect::<Float64Array>()
+ }),
+ DataType::Float32 => Arc::new({
+ let arg = &args[0]
+ .as_any()
+ .downcast_ref::<Float32Array>()
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "could not cast {} to {}",
+ self.name(),
+ std::any::type_name::<Float32Array>()
+ ))
+ })?;
+
+ arg.iter()
+ .map(|a| a.map(f32::floor))
+ .collect::<Float32Array>()
+ }),
+ other => {
+ return exec_err!(
+ "Unsupported data type {other:?} for function {}",
+ self.name()
+ );
+ }
+ };
+ Ok(ColumnarValue::Array(arr))
+ }
+ }
+
#[derive(Clone)]
struct DummyProperty {
expr_type: String,
diff --git a/datafusion/proto/proto/datafusion.proto
b/datafusion/proto/proto/datafusion.proto
index 7f967657f5..b656bededc 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -550,7 +550,7 @@ enum ScalarFunction {
// 6 was Cos
// 7 was Digest
Exp = 8;
- Floor = 9;
+ // 9 was Floor
// 10 was Ln
Log = 11;
// 12 was Log10
@@ -621,12 +621,12 @@ enum ScalarFunction {
// 77 was Sinh
// 78 was Cosh
// Tanh = 79
- Pi = 80;
+ // 80 was Pi
// 81 was Degrees
// 82 was Radians
Factorial = 83;
- Lcm = 84;
- Gcd = 85;
+ // 84 was Lcm
+ // 85 was Gcd
// 86 was ArrayAppend
// 87 was ArrayConcat
// 88 was ArrayDims
diff --git a/datafusion/proto/src/generated/pbjson.rs
b/datafusion/proto/src/generated/pbjson.rs
index 966d7f7f74..c13ae045bd 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -22794,7 +22794,6 @@ impl serde::Serialize for ScalarFunction {
Self::Unknown => "unknown",
Self::Ceil => "Ceil",
Self::Exp => "Exp",
- Self::Floor => "Floor",
Self::Log => "Log",
Self::Round => "Round",
Self::Trunc => "Trunc",
@@ -22804,10 +22803,7 @@ impl serde::Serialize for ScalarFunction {
Self::Random => "Random",
Self::Coalesce => "Coalesce",
Self::Power => "Power",
- Self::Pi => "Pi",
Self::Factorial => "Factorial",
- Self::Lcm => "Lcm",
- Self::Gcd => "Gcd",
Self::Cot => "Cot",
Self::Nanvl => "Nanvl",
Self::Iszero => "Iszero",
@@ -22826,7 +22822,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"unknown",
"Ceil",
"Exp",
- "Floor",
"Log",
"Round",
"Trunc",
@@ -22836,10 +22831,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Random",
"Coalesce",
"Power",
- "Pi",
"Factorial",
- "Lcm",
- "Gcd",
"Cot",
"Nanvl",
"Iszero",
@@ -22887,7 +22879,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"unknown" => Ok(ScalarFunction::Unknown),
"Ceil" => Ok(ScalarFunction::Ceil),
"Exp" => Ok(ScalarFunction::Exp),
- "Floor" => Ok(ScalarFunction::Floor),
"Log" => Ok(ScalarFunction::Log),
"Round" => Ok(ScalarFunction::Round),
"Trunc" => Ok(ScalarFunction::Trunc),
@@ -22897,10 +22888,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
"Random" => Ok(ScalarFunction::Random),
"Coalesce" => Ok(ScalarFunction::Coalesce),
"Power" => Ok(ScalarFunction::Power),
- "Pi" => Ok(ScalarFunction::Pi),
"Factorial" => Ok(ScalarFunction::Factorial),
- "Lcm" => Ok(ScalarFunction::Lcm),
- "Gcd" => Ok(ScalarFunction::Gcd),
"Cot" => Ok(ScalarFunction::Cot),
"Nanvl" => Ok(ScalarFunction::Nanvl),
"Iszero" => Ok(ScalarFunction::Iszero),
diff --git a/datafusion/proto/src/generated/prost.rs
b/datafusion/proto/src/generated/prost.rs
index c94aa1f4ed..092d5c59d0 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2849,7 +2849,7 @@ pub enum ScalarFunction {
/// 6 was Cos
/// 7 was Digest
Exp = 8,
- Floor = 9,
+ /// 9 was Floor
/// 10 was Ln
Log = 11,
/// 12 was Log10
@@ -2920,12 +2920,12 @@ pub enum ScalarFunction {
/// 77 was Sinh
/// 78 was Cosh
/// Tanh = 79
- Pi = 80,
+ /// 80 was Pi
/// 81 was Degrees
/// 82 was Radians
Factorial = 83,
- Lcm = 84,
- Gcd = 85,
+ /// 84 was Lcm
+ /// 85 was Gcd
/// 86 was ArrayAppend
/// 87 was ArrayConcat
/// 88 was ArrayDims
@@ -2989,7 +2989,6 @@ impl ScalarFunction {
ScalarFunction::Unknown => "unknown",
ScalarFunction::Ceil => "Ceil",
ScalarFunction::Exp => "Exp",
- ScalarFunction::Floor => "Floor",
ScalarFunction::Log => "Log",
ScalarFunction::Round => "Round",
ScalarFunction::Trunc => "Trunc",
@@ -2999,10 +2998,7 @@ impl ScalarFunction {
ScalarFunction::Random => "Random",
ScalarFunction::Coalesce => "Coalesce",
ScalarFunction::Power => "Power",
- ScalarFunction::Pi => "Pi",
ScalarFunction::Factorial => "Factorial",
- ScalarFunction::Lcm => "Lcm",
- ScalarFunction::Gcd => "Gcd",
ScalarFunction::Cot => "Cot",
ScalarFunction::Nanvl => "Nanvl",
ScalarFunction::Iszero => "Iszero",
@@ -3015,7 +3011,6 @@ impl ScalarFunction {
"unknown" => Some(Self::Unknown),
"Ceil" => Some(Self::Ceil),
"Exp" => Some(Self::Exp),
- "Floor" => Some(Self::Floor),
"Log" => Some(Self::Log),
"Round" => Some(Self::Round),
"Trunc" => Some(Self::Trunc),
@@ -3025,10 +3020,7 @@ impl ScalarFunction {
"Random" => Some(Self::Random),
"Coalesce" => Some(Self::Coalesce),
"Power" => Some(Self::Power),
- "Pi" => Some(Self::Pi),
"Factorial" => Some(Self::Factorial),
- "Lcm" => Some(Self::Lcm),
- "Gcd" => Some(Self::Gcd),
"Cot" => Some(Self::Cot),
"Nanvl" => Some(Self::Nanvl),
"Iszero" => Some(Self::Iszero),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs
b/datafusion/proto/src/logical_plan/from_proto.rs
index 96b3b5942e..9c24a39418 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -39,9 +39,9 @@ use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_
use datafusion_expr::{
ceil, coalesce, concat_expr, concat_ws_expr, cot, ends_with, exp,
expr::{self, InList, Sort, WindowFunction},
- factorial, floor, gcd, initcap, iszero, lcm, log,
+ factorial, initcap, iszero, log,
logical_plan::{PlanType, StringifiedPlan},
- nanvl, pi, power, random, round, trunc, AggregateFunction, Between,
BinaryExpr,
+ nanvl, power, random, round, trunc, AggregateFunction, Between, BinaryExpr,
BuiltInWindowFunction, BuiltinScalarFunction, Case, Cast, Expr,
GetFieldAccess,
GetIndexedField, GroupingSet,
GroupingSet::GroupingSets,
@@ -423,9 +423,6 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::Exp => Self::Exp,
ScalarFunction::Log => Self::Log,
ScalarFunction::Factorial => Self::Factorial,
- ScalarFunction::Gcd => Self::Gcd,
- ScalarFunction::Lcm => Self::Lcm,
- ScalarFunction::Floor => Self::Floor,
ScalarFunction::Ceil => Self::Ceil,
ScalarFunction::Round => Self::Round,
ScalarFunction::Trunc => Self::Trunc,
@@ -435,7 +432,6 @@ impl From<&protobuf::ScalarFunction> for
BuiltinScalarFunction {
ScalarFunction::InitCap => Self::InitCap,
ScalarFunction::Random => Self::Random,
ScalarFunction::Coalesce => Self::Coalesce,
- ScalarFunction::Pi => Self::Pi,
ScalarFunction::Power => Self::Power,
ScalarFunction::Nanvl => Self::Nanvl,
ScalarFunction::Iszero => Self::Iszero,
@@ -1301,9 +1297,6 @@ pub fn parse_expr(
match scalar_function {
ScalarFunction::Unknown => Err(proto_error("Unknown scalar
function")),
ScalarFunction::Exp => Ok(exp(parse_expr(&args[0], registry,
codec)?)),
- ScalarFunction::Floor => {
- Ok(floor(parse_expr(&args[0], registry, codec)?))
- }
ScalarFunction::Factorial => {
Ok(factorial(parse_expr(&args[0], registry, codec)?))
}
@@ -1313,14 +1306,6 @@ pub fn parse_expr(
ScalarFunction::InitCap => {
Ok(initcap(parse_expr(&args[0], registry, codec)?))
}
- ScalarFunction::Gcd => Ok(gcd(
- parse_expr(&args[0], registry, codec)?,
- parse_expr(&args[1], registry, codec)?,
- )),
- ScalarFunction::Lcm => Ok(lcm(
- parse_expr(&args[0], registry, codec)?,
- parse_expr(&args[1], registry, codec)?,
- )),
ScalarFunction::Random => Ok(random()),
ScalarFunction::Concat => {
Ok(concat_expr(parse_exprs(args, registry, codec)?))
@@ -1335,7 +1320,6 @@ pub fn parse_expr(
ScalarFunction::Coalesce => {
Ok(coalesce(parse_exprs(args, registry, codec)?))
}
- ScalarFunction::Pi => Ok(pi()),
ScalarFunction::Power => Ok(power(
parse_expr(&args[0], registry, codec)?,
parse_expr(&args[1], registry, codec)?,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs
b/datafusion/proto/src/logical_plan/to_proto.rs
index a10edb3932..bd964b43d4 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -1410,10 +1410,7 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::Cot => Self::Cot,
BuiltinScalarFunction::Exp => Self::Exp,
BuiltinScalarFunction::Factorial => Self::Factorial,
- BuiltinScalarFunction::Gcd => Self::Gcd,
- BuiltinScalarFunction::Lcm => Self::Lcm,
BuiltinScalarFunction::Log => Self::Log,
- BuiltinScalarFunction::Floor => Self::Floor,
BuiltinScalarFunction::Ceil => Self::Ceil,
BuiltinScalarFunction::Round => Self::Round,
BuiltinScalarFunction::Trunc => Self::Trunc,
@@ -1423,7 +1420,6 @@ impl TryFrom<&BuiltinScalarFunction> for
protobuf::ScalarFunction {
BuiltinScalarFunction::InitCap => Self::InitCap,
BuiltinScalarFunction::Random => Self::Random,
BuiltinScalarFunction::Coalesce => Self::Coalesce,
- BuiltinScalarFunction::Pi => Self::Pi,
BuiltinScalarFunction::Power => Self::Power,
BuiltinScalarFunction::Nanvl => Self::Nanvl,
BuiltinScalarFunction::Iszero => Self::Iszero,
diff --git a/datafusion/sql/src/expr/function.rs
b/datafusion/sql/src/expr/function.rs
index e97eb1a32b..4bf0906685 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -18,7 +18,8 @@
use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
use arrow_schema::DataType;
use datafusion_common::{
- not_impl_err, plan_datafusion_err, plan_err, DFSchema, Dependency, Result,
+ internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err,
DFSchema,
+ Dependency, Result,
};
use datafusion_expr::window_frame::{check_window_frame,
regularize_window_order_by};
use datafusion_expr::{
@@ -264,6 +265,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
plan_err!("Invalid function '{name}'.\nDid you mean
'{suggested_func_name}'?")
}
+ pub(super) fn sql_fn_name_to_expr(
+ &self,
+ expr: SQLExpr,
+ fn_name: &str,
+ schema: &DFSchema,
+ planner_context: &mut PlannerContext,
+ ) -> Result<Expr> {
+ let fun = self
+ .context_provider
+ .get_function_meta(fn_name)
+ .ok_or_else(|| {
+ internal_datafusion_err!("Unable to find expected '{fn_name}'
function")
+ })?;
+ let args = vec![self.sql_expr_to_logical_expr(expr, schema,
planner_context)?];
+ Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
+ }
+
pub(super) fn sql_named_function_to_expr(
&self,
expr: SQLExpr,
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index c2f72720af..7763fa2d8d 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -518,12 +518,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
SQLExpr::Floor {
expr,
field: _field,
- } => self.sql_named_function_to_expr(
- *expr,
- BuiltinScalarFunction::Floor,
- schema,
- planner_context,
- ),
+ } => self.sql_fn_name_to_expr(*expr, "floor", schema,
planner_context),
SQLExpr::Ceil {
expr,
field: _field,