This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bece785174 move Floor, Gcd, Lcm, Pi to datafusion-functions (#9976)
bece785174 is described below

commit bece785174c199f4fde4343a27c2213fae11bfb8
Author: Bruce Ritchie <[email protected]>
AuthorDate: Mon Apr 8 12:03:00 2024 -0400

    move Floor, Gcd, Lcm, Pi to datafusion-functions (#9976)
    
    * move Floor, Gcd, Lcm, Pi to datafusion-functions
---
 datafusion/expr/src/built_in_function.rs           |  29 +----
 datafusion/expr/src/expr_fn.rs                     |  17 +--
 datafusion/functions/src/math/gcd.rs               | 145 +++++++++++++++++++++
 datafusion/functions/src/math/lcm.rs               | 126 ++++++++++++++++++
 datafusion/functions/src/math/mod.rs               |  14 +-
 datafusion/functions/src/math/pi.rs                |  76 +++++++++++
 datafusion/optimizer/src/analyzer/type_coercion.rs |  30 +++--
 .../physical-expr/src/equivalence/ordering.rs      |  44 ++++---
 .../physical-expr/src/equivalence/projection.rs    |  41 +++---
 .../physical-expr/src/equivalence/properties.rs    |  41 +++---
 datafusion/physical-expr/src/functions.rs          |  10 +-
 datafusion/physical-expr/src/math_expressions.rs   | 118 -----------------
 datafusion/physical-expr/src/udf.rs                |  55 ++------
 datafusion/physical-expr/src/utils/mod.rs          |  99 +++++++++++++-
 datafusion/proto/proto/datafusion.proto            |   8 +-
 datafusion/proto/src/generated/pbjson.rs           |  12 --
 datafusion/proto/src/generated/prost.rs            |  16 +--
 datafusion/proto/src/logical_plan/from_proto.rs    |  20 +--
 datafusion/proto/src/logical_plan/to_proto.rs      |   4 -
 datafusion/sql/src/expr/function.rs                |  20 ++-
 datafusion/sql/src/expr/mod.rs                     |   7 +-
 21 files changed, 588 insertions(+), 344 deletions(-)

diff --git a/datafusion/expr/src/built_in_function.rs 
b/datafusion/expr/src/built_in_function.rs
index dc1fc98a5c..7426ccd938 100644
--- a/datafusion/expr/src/built_in_function.rs
+++ b/datafusion/expr/src/built_in_function.rs
@@ -45,20 +45,12 @@ pub enum BuiltinScalarFunction {
     Exp,
     /// factorial
     Factorial,
-    /// floor
-    Floor,
-    /// gcd, Greatest common divisor
-    Gcd,
-    /// lcm, Least common multiple
-    Lcm,
     /// iszero
     Iszero,
     /// log, same as log10
     Log,
     /// nanvl
     Nanvl,
-    /// pi
-    Pi,
     /// power
     Power,
     /// round
@@ -135,13 +127,9 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Coalesce => Volatility::Immutable,
             BuiltinScalarFunction::Exp => Volatility::Immutable,
             BuiltinScalarFunction::Factorial => Volatility::Immutable,
-            BuiltinScalarFunction::Floor => Volatility::Immutable,
-            BuiltinScalarFunction::Gcd => Volatility::Immutable,
             BuiltinScalarFunction::Iszero => Volatility::Immutable,
-            BuiltinScalarFunction::Lcm => Volatility::Immutable,
             BuiltinScalarFunction::Log => Volatility::Immutable,
             BuiltinScalarFunction::Nanvl => Volatility::Immutable,
-            BuiltinScalarFunction::Pi => Volatility::Immutable,
             BuiltinScalarFunction::Power => Volatility::Immutable,
             BuiltinScalarFunction::Round => Volatility::Immutable,
             BuiltinScalarFunction::Cot => Volatility::Immutable,
@@ -183,13 +171,10 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::InitCap => {
                 utf8_to_str_type(&input_expr_types[0], "initcap")
             }
-            BuiltinScalarFunction::Pi => Ok(Float64),
             BuiltinScalarFunction::Random => Ok(Float64),
             BuiltinScalarFunction::EndsWith => Ok(Boolean),
 
-            BuiltinScalarFunction::Factorial
-            | BuiltinScalarFunction::Gcd
-            | BuiltinScalarFunction::Lcm => Ok(Int64),
+            BuiltinScalarFunction::Factorial => Ok(Int64),
 
             BuiltinScalarFunction::Power => match &input_expr_types[0] {
                 Int64 => Ok(Int64),
@@ -210,7 +195,6 @@ impl BuiltinScalarFunction {
 
             BuiltinScalarFunction::Ceil
             | BuiltinScalarFunction::Exp
-            | BuiltinScalarFunction::Floor
             | BuiltinScalarFunction::Round
             | BuiltinScalarFunction::Trunc
             | BuiltinScalarFunction::Cot => match input_expr_types[0] {
@@ -248,7 +232,6 @@ impl BuiltinScalarFunction {
                 ],
                 self.volatility(),
             ),
-            BuiltinScalarFunction::Pi => Signature::exact(vec![], 
self.volatility()),
             BuiltinScalarFunction::Random => Signature::exact(vec![], 
self.volatility()),
             BuiltinScalarFunction::Power => Signature::one_of(
                 vec![Exact(vec![Int64, Int64]), Exact(vec![Float64, Float64])],
@@ -289,12 +272,8 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Factorial => {
                 Signature::uniform(1, vec![Int64], self.volatility())
             }
-            BuiltinScalarFunction::Gcd | BuiltinScalarFunction::Lcm => {
-                Signature::uniform(2, vec![Int64], self.volatility())
-            }
             BuiltinScalarFunction::Ceil
             | BuiltinScalarFunction::Exp
-            | BuiltinScalarFunction::Floor
             | BuiltinScalarFunction::Cot => {
                 // math expressions expect 1 argument of type f64 or f32
                 // priority is given to f64 because e.g. `sqrt(1i32)` is in IR 
(real numbers) and thus we
@@ -319,10 +298,8 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Ceil
                 | BuiltinScalarFunction::Exp
                 | BuiltinScalarFunction::Factorial
-                | BuiltinScalarFunction::Floor
                 | BuiltinScalarFunction::Round
                 | BuiltinScalarFunction::Trunc
-                | BuiltinScalarFunction::Pi
         ) {
             Some(vec![Some(true)])
         } else if *self == BuiltinScalarFunction::Log {
@@ -339,13 +316,9 @@ impl BuiltinScalarFunction {
             BuiltinScalarFunction::Cot => &["cot"],
             BuiltinScalarFunction::Exp => &["exp"],
             BuiltinScalarFunction::Factorial => &["factorial"],
-            BuiltinScalarFunction::Floor => &["floor"],
-            BuiltinScalarFunction::Gcd => &["gcd"],
             BuiltinScalarFunction::Iszero => &["iszero"],
-            BuiltinScalarFunction::Lcm => &["lcm"],
             BuiltinScalarFunction::Log => &["log"],
             BuiltinScalarFunction::Nanvl => &["nanvl"],
-            BuiltinScalarFunction::Pi => &["pi"],
             BuiltinScalarFunction::Power => &["power", "pow"],
             BuiltinScalarFunction::Random => &["random"],
             BuiltinScalarFunction::Round => &["round"],
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index f68685a87f..6c811ff064 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -297,11 +297,6 @@ pub fn concat_ws(sep: Expr, values: Vec<Expr>) -> Expr {
     ))
 }
 
-/// Returns an approximate value of π
-pub fn pi() -> Expr {
-    Expr::ScalarFunction(ScalarFunction::new(BuiltinScalarFunction::Pi, 
vec![]))
-}
-
 /// Returns a random value in the range 0.0 <= x < 1.0
 pub fn random() -> Expr {
     Expr::ScalarFunction(ScalarFunction::new(BuiltinScalarFunction::Random, 
vec![]))
@@ -537,12 +532,6 @@ macro_rules! nary_scalar_expr {
 // math functions
 scalar_expr!(Cot, cot, num, "cotangent of a number");
 scalar_expr!(Factorial, factorial, num, "factorial");
-scalar_expr!(
-    Floor,
-    floor,
-    num,
-    "nearest integer less than or equal to argument"
-);
 scalar_expr!(
     Ceil,
     ceil,
@@ -556,8 +545,7 @@ nary_scalar_expr!(
     "truncate toward zero, with optional precision"
 );
 scalar_expr!(Exp, exp, num, "exponential");
-scalar_expr!(Gcd, gcd, arg_1 arg_2, "greatest common divisor");
-scalar_expr!(Lcm, lcm, arg_1 arg_2, "least common multiple");
+
 scalar_expr!(Power, power, base exponent, "`base` raised to the power of 
`exponent`");
 scalar_expr!(Log, log, base x, "logarithm of a `x` for a particular `base`");
 
@@ -974,7 +962,6 @@ mod test {
     fn scalar_function_definitions() {
         test_unary_scalar_expr!(Cot, cot);
         test_unary_scalar_expr!(Factorial, factorial);
-        test_unary_scalar_expr!(Floor, floor);
         test_unary_scalar_expr!(Ceil, ceil);
         test_nary_scalar_expr!(Round, round, input);
         test_nary_scalar_expr!(Round, round, input, decimal_places);
@@ -984,8 +971,6 @@ mod test {
         test_scalar_expr!(Nanvl, nanvl, x, y);
         test_scalar_expr!(Iszero, iszero, input);
 
-        test_scalar_expr!(Gcd, gcd, arg_1, arg_2);
-        test_scalar_expr!(Lcm, lcm, arg_1, arg_2);
         test_scalar_expr!(InitCap, initcap, string);
         test_scalar_expr!(EndsWith, ends_with, string, characters);
     }
diff --git a/datafusion/functions/src/math/gcd.rs 
b/datafusion/functions/src/math/gcd.rs
new file mode 100644
index 0000000000..41c9e4e233
--- /dev/null
+++ b/datafusion/functions/src/math/gcd.rs
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int64Array};
+use std::any::Any;
+use std::mem::swap;
+use std::sync::Arc;
+
+use arrow::datatypes::DataType;
+use arrow::datatypes::DataType::Int64;
+
+use crate::utils::make_scalar_function;
+use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+
+#[derive(Debug)]
+pub struct GcdFunc {
+    signature: Signature,
+}
+
+impl Default for GcdFunc {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl GcdFunc {
+    pub fn new() -> Self {
+        use DataType::*;
+        Self {
+            signature: Signature::uniform(2, vec![Int64], 
Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for GcdFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "gcd"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(Int64)
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(gcd, vec![])(args)
+    }
+}
+
+/// Gcd SQL function
+fn gcd(args: &[ArrayRef]) -> Result<ArrayRef> {
+    match args[0].data_type() {
+        Int64 => Ok(Arc::new(make_function_inputs2!(
+            &args[0],
+            &args[1],
+            "x",
+            "y",
+            Int64Array,
+            Int64Array,
+            { compute_gcd }
+        )) as ArrayRef),
+        other => exec_err!("Unsupported data type {other:?} for function gcd"),
+    }
+}
+
+/// Computes greatest common divisor using Binary GCD algorithm.
+pub fn compute_gcd(x: i64, y: i64) -> i64 {
+    let mut a = x.wrapping_abs();
+    let mut b = y.wrapping_abs();
+
+    if a == 0 {
+        return b;
+    }
+    if b == 0 {
+        return a;
+    }
+
+    let shift = (a | b).trailing_zeros();
+    a >>= shift;
+    b >>= shift;
+    a >>= a.trailing_zeros();
+
+    loop {
+        b >>= b.trailing_zeros();
+        if a > b {
+            swap(&mut a, &mut b);
+        }
+
+        b -= a;
+
+        if b == 0 {
+            return a << shift;
+        }
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::sync::Arc;
+
+    use arrow::array::{ArrayRef, Int64Array};
+
+    use crate::math::gcd::gcd;
+    use datafusion_common::cast::as_int64_array;
+
+    #[test]
+    fn test_gcd_i64() {
+        let args: Vec<ArrayRef> = vec![
+            Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
+            Arc::new(Int64Array::from(vec![0, -2, 15, 8])),  // y
+        ];
+
+        let result = gcd(&args).expect("failed to initialize function gcd");
+        let ints = as_int64_array(&result).expect("failed to initialize 
function gcd");
+
+        assert_eq!(ints.len(), 4);
+        assert_eq!(ints.value(0), 0);
+        assert_eq!(ints.value(1), 1);
+        assert_eq!(ints.value(2), 5);
+        assert_eq!(ints.value(3), 8);
+    }
+}
diff --git a/datafusion/functions/src/math/lcm.rs 
b/datafusion/functions/src/math/lcm.rs
new file mode 100644
index 0000000000..3674f7371d
--- /dev/null
+++ b/datafusion/functions/src/math/lcm.rs
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::{ArrayRef, Int64Array};
+use arrow::datatypes::DataType;
+use arrow::datatypes::DataType::Int64;
+
+use datafusion_common::{exec_err, DataFusionError, Result};
+use datafusion_expr::ColumnarValue;
+use datafusion_expr::{ScalarUDFImpl, Signature, Volatility};
+
+use crate::math::gcd::compute_gcd;
+use crate::utils::make_scalar_function;
+
+#[derive(Debug)]
+pub struct LcmFunc {
+    signature: Signature,
+}
+
+impl Default for LcmFunc {
+    fn default() -> Self {
+        LcmFunc::new()
+    }
+}
+
+impl LcmFunc {
+    pub fn new() -> Self {
+        use DataType::*;
+        Self {
+            signature: Signature::uniform(2, vec![Int64], 
Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for LcmFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "lcm"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(Int64)
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        make_scalar_function(lcm, vec![])(args)
+    }
+}
+
+/// Lcm SQL function
+fn lcm(args: &[ArrayRef]) -> Result<ArrayRef> {
+    let compute_lcm = |x: i64, y: i64| {
+        let a = x.wrapping_abs();
+        let b = y.wrapping_abs();
+
+        if a == 0 || b == 0 {
+            return 0;
+        }
+        a / compute_gcd(a, b) * b
+    };
+
+    match args[0].data_type() {
+        Int64 => Ok(Arc::new(make_function_inputs2!(
+            &args[0],
+            &args[1],
+            "x",
+            "y",
+            Int64Array,
+            Int64Array,
+            { compute_lcm }
+        )) as ArrayRef),
+        other => exec_err!("Unsupported data type {other:?} for function lcm"),
+    }
+}
+
+#[cfg(test)]
+mod test {
+    use std::sync::Arc;
+
+    use arrow::array::{ArrayRef, Int64Array};
+
+    use datafusion_common::cast::as_int64_array;
+
+    use crate::math::lcm::lcm;
+
+    #[test]
+    fn test_lcm_i64() {
+        let args: Vec<ArrayRef> = vec![
+            Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
+            Arc::new(Int64Array::from(vec![0, -2, 15, 8])),  // y
+        ];
+
+        let result = lcm(&args).expect("failed to initialize function lcm");
+        let ints = as_int64_array(&result).expect("failed to initialize 
function lcm");
+
+        assert_eq!(ints.len(), 4);
+        assert_eq!(ints.value(0), 0);
+        assert_eq!(ints.value(1), 6);
+        assert_eq!(ints.value(2), 75);
+        assert_eq!(ints.value(3), 16);
+    }
+}
diff --git a/datafusion/functions/src/math/mod.rs 
b/datafusion/functions/src/math/mod.rs
index f241c8b325..3a1f7cc13b 100644
--- a/datafusion/functions/src/math/mod.rs
+++ b/datafusion/functions/src/math/mod.rs
@@ -18,11 +18,17 @@
 //! "math" DataFusion functions
 
 pub mod abs;
+pub mod gcd;
+pub mod lcm;
 pub mod nans;
+pub mod pi;
 
 // Create UDFs
 make_udf_function!(nans::IsNanFunc, ISNAN, isnan);
 make_udf_function!(abs::AbsFunc, ABS, abs);
+make_udf_function!(gcd::GcdFunc, GCD, gcd);
+make_udf_function!(lcm::LcmFunc, LCM, lcm);
+make_udf_function!(pi::PiFunc, PI, pi);
 
 make_math_unary_udf!(Log2Func, LOG2, log2, log2, Some(vec![Some(true)]));
 make_math_unary_udf!(Log10Func, LOG10, log10, log10, Some(vec![Some(true)]));
@@ -50,6 +56,8 @@ make_math_unary_udf!(CosFunc, COS, cos, cos, None);
 make_math_unary_udf!(CoshFunc, COSH, cosh, cosh, None);
 make_math_unary_udf!(DegreesFunc, DEGREES, degrees, to_degrees, None);
 
+make_math_unary_udf!(FloorFunc, FLOOR, floor, floor, Some(vec![Some(true)]));
+
 // Export the functions out of this package, both as expr_fn as well as a list 
of functions
 export_functions!(
     (
@@ -86,5 +94,9 @@ export_functions!(
     (cbrt, num, "cube root of a number"),
     (cos, num, "cosine"),
     (cosh, num, "hyperbolic cosine"),
-    (degrees, num, "converts radians to degrees")
+    (degrees, num, "converts radians to degrees"),
+    (gcd, x y, "greatest common divisor"),
+    (lcm, x y, "least common multiple"),
+    (floor, num, "nearest integer less than or equal to argument"),
+    (pi, , "Returns an approximate value of π")
 );
diff --git a/datafusion/functions/src/math/pi.rs 
b/datafusion/functions/src/math/pi.rs
new file mode 100644
index 0000000000..0801e79751
--- /dev/null
+++ b/datafusion/functions/src/math/pi.rs
@@ -0,0 +1,76 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::sync::Arc;
+
+use arrow::array::Float64Array;
+use arrow::datatypes::DataType;
+use arrow::datatypes::DataType::Float64;
+
+use datafusion_common::{exec_err, Result};
+use datafusion_expr::{ColumnarValue, FuncMonotonicity, Volatility};
+use datafusion_expr::{ScalarUDFImpl, Signature};
+
+#[derive(Debug)]
+pub struct PiFunc {
+    signature: Signature,
+}
+
+impl Default for PiFunc {
+    fn default() -> Self {
+        PiFunc::new()
+    }
+}
+
+impl PiFunc {
+    pub fn new() -> Self {
+        Self {
+            signature: Signature::exact(vec![], Volatility::Immutable),
+        }
+    }
+}
+
+impl ScalarUDFImpl for PiFunc {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        "pi"
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+        Ok(Float64)
+    }
+
+    fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+        if !matches!(&args[0], ColumnarValue::Array(_)) {
+            return exec_err!("Expect pi function to take no param");
+        }
+        let array = Float64Array::from_value(std::f64::consts::PI, 1);
+        Ok(ColumnarValue::Array(Arc::new(array)))
+    }
+
+    fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
+        Ok(Some(vec![Some(true)]))
+    }
+}
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 04de243fba..1ea8b9534e 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -19,9 +19,8 @@
 
 use std::sync::Arc;
 
-use crate::analyzer::AnalyzerRule;
-
 use arrow::datatypes::{DataType, IntervalUnit};
+
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{Transformed, TreeNodeRewriter};
 use datafusion_common::{
@@ -51,6 +50,8 @@ use datafusion_expr::{
     WindowFrameUnits,
 };
 
+use crate::analyzer::AnalyzerRule;
+
 #[derive(Default)]
 pub struct TypeCoercion {}
 
@@ -758,25 +759,25 @@ mod test {
     use std::any::Any;
     use std::sync::{Arc, OnceLock};
 
-    use crate::analyzer::type_coercion::{
-        coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
-    };
-    use crate::test::assert_analyzed_plan_eq;
-
     use arrow::datatypes::{DataType, Field, TimeUnit};
+
     use datafusion_common::tree_node::{TransformedResult, TreeNode};
     use datafusion_common::{DFSchema, DFSchemaRef, Result, ScalarValue};
     use datafusion_expr::expr::{self, InSubquery, Like, ScalarFunction};
     use datafusion_expr::logical_plan::{EmptyRelation, Projection};
     use datafusion_expr::{
         cast, col, concat, concat_ws, create_udaf, is_true, lit,
-        AccumulatorFactoryFunction, AggregateFunction, AggregateUDF, 
BinaryExpr,
-        BuiltinScalarFunction, Case, ColumnarValue, Expr, ExprSchemable, 
Filter,
-        LogicalPlan, Operator, ScalarUDF, ScalarUDFImpl, Signature, 
SimpleAggregateUDF,
-        Subquery, Volatility,
+        AccumulatorFactoryFunction, AggregateFunction, AggregateUDF, 
BinaryExpr, Case,
+        ColumnarValue, Expr, ExprSchemable, Filter, LogicalPlan, Operator, 
ScalarUDF,
+        ScalarUDFImpl, Signature, SimpleAggregateUDF, Subquery, Volatility,
     };
     use datafusion_physical_expr::expressions::AvgAccumulator;
 
+    use crate::analyzer::type_coercion::{
+        coerce_case_expression, TypeCoercion, TypeCoercionRewriter,
+    };
+    use crate::test::assert_analyzed_plan_eq;
+
     fn empty() -> Arc<LogicalPlan> {
         Arc::new(LogicalPlan::EmptyRelation(EmptyRelation {
             produce_one_row: false,
@@ -875,14 +876,15 @@ mod test {
         // test that automatic argument type coercion for scalar functions work
         let empty = empty();
         let lit_expr = lit(10i64);
-        let fun: BuiltinScalarFunction = BuiltinScalarFunction::Floor;
+        let fun = ScalarUDF::new_from_impl(TestScalarUDF {});
         let scalar_function_expr =
-            Expr::ScalarFunction(ScalarFunction::new(fun, vec![lit_expr]));
+            Expr::ScalarFunction(ScalarFunction::new_udf(Arc::new(fun), 
vec![lit_expr]));
         let plan = LogicalPlan::Projection(Projection::try_new(
             vec![scalar_function_expr],
             empty,
         )?);
-        let expected = "Projection: floor(CAST(Int64(10) AS Float64))\n  
EmptyRelation";
+        let expected =
+            "Projection: TestScalarUDF(CAST(Int64(10) AS Float32))\n  
EmptyRelation";
         assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), &plan, expected)
     }
 
diff --git a/datafusion/physical-expr/src/equivalence/ordering.rs 
b/datafusion/physical-expr/src/equivalence/ordering.rs
index 1364d3a8c0..688cdf798b 100644
--- a/datafusion/physical-expr/src/equivalence/ordering.rs
+++ b/datafusion/physical-expr/src/equivalence/ordering.rs
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_schema::SortOptions;
 use std::hash::Hash;
 use std::sync::Arc;
 
+use arrow_schema::SortOptions;
+
 use crate::equivalence::add_offset_to_expr;
 use crate::{LexOrdering, PhysicalExpr, PhysicalSortExpr};
 
@@ -220,6 +221,16 @@ fn resolve_overlap(orderings: &mut [LexOrdering], idx: 
usize, pre_idx: usize) ->
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::SortOptions;
+    use itertools::Itertools;
+
+    use datafusion_common::{DFSchema, Result};
+    use datafusion_expr::execution_props::ExecutionProps;
+    use datafusion_expr::{BuiltinScalarFunction, Operator, ScalarUDF};
+
     use crate::equivalence::tests::{
         convert_to_orderings, convert_to_sort_exprs, create_random_schema,
         create_test_params, generate_table_for_eq_properties, 
is_table_same_after_sort,
@@ -231,14 +242,8 @@ mod tests {
     use crate::expressions::Column;
     use crate::expressions::{col, BinaryExpr};
     use crate::functions::create_physical_expr;
+    use crate::utils::tests::TestScalarUDF;
     use crate::{PhysicalExpr, PhysicalSortExpr};
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow_schema::SortOptions;
-    use datafusion_common::Result;
-    use datafusion_expr::execution_props::ExecutionProps;
-    use datafusion_expr::{BuiltinScalarFunction, Operator};
-    use itertools::Itertools;
-    use std::sync::Arc;
 
     #[test]
     fn test_ordering_satisfy() -> Result<()> {
@@ -281,17 +286,20 @@ mod tests {
         let col_d = &col("d", &test_schema)?;
         let col_e = &col("e", &test_schema)?;
         let col_f = &col("f", &test_schema)?;
-        let floor_a = &create_physical_expr(
-            &BuiltinScalarFunction::Floor,
+        let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+        let floor_a = &crate::udf::create_physical_expr(
+            &test_fun,
             &[col("a", &test_schema)?],
             &test_schema,
-            &ExecutionProps::default(),
+            &[],
+            &DFSchema::empty(),
         )?;
-        let floor_f = &create_physical_expr(
-            &BuiltinScalarFunction::Floor,
+        let floor_f = &crate::udf::create_physical_expr(
+            &test_fun,
             &[col("f", &test_schema)?],
             &test_schema,
-            &ExecutionProps::default(),
+            &[],
+            &DFSchema::empty(),
         )?;
         let exp_a = &create_physical_expr(
             &BuiltinScalarFunction::Exp,
@@ -804,11 +812,13 @@ mod tests {
             let table_data_with_properties =
                 generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
 
-            let floor_a = create_physical_expr(
-                &BuiltinScalarFunction::Floor,
+            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+            let floor_a = crate::udf::create_physical_expr(
+                &test_fun,
                 &[col("a", &test_schema)?],
                 &test_schema,
-                &ExecutionProps::default(),
+                &[],
+                &DFSchema::empty(),
             )?;
             let a_plus_b = Arc::new(BinaryExpr::new(
                 col("a", &test_schema)?,
diff --git a/datafusion/physical-expr/src/equivalence/projection.rs 
b/datafusion/physical-expr/src/equivalence/projection.rs
index b8231a74c2..5efcf5942c 100644
--- a/datafusion/physical-expr/src/equivalence/projection.rs
+++ b/datafusion/physical-expr/src/equivalence/projection.rs
@@ -17,13 +17,14 @@
 
 use std::sync::Arc;
 
-use crate::expressions::Column;
-use crate::PhysicalExpr;
-
 use arrow::datatypes::SchemaRef;
+
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::Result;
 
+use crate::expressions::Column;
+use crate::PhysicalExpr;
+
 /// Stores the mapping between source expressions and target expressions for a
 /// projection.
 #[derive(Debug, Clone)]
@@ -111,7 +112,14 @@ impl ProjectionMapping {
 mod tests {
     use std::sync::Arc;
 
-    use super::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::{SortOptions, TimeUnit};
+    use itertools::Itertools;
+
+    use datafusion_common::{DFSchema, Result};
+    use datafusion_expr::execution_props::ExecutionProps;
+    use datafusion_expr::{BuiltinScalarFunction, Operator, ScalarUDF};
+
     use crate::equivalence::tests::{
         apply_projection, convert_to_orderings, convert_to_orderings_owned,
         create_random_schema, generate_table_for_eq_properties, 
is_table_same_after_sort,
@@ -119,16 +127,11 @@ mod tests {
     };
     use crate::equivalence::EquivalenceProperties;
     use crate::expressions::{col, BinaryExpr};
-    use crate::functions::create_physical_expr;
+    use crate::udf::create_physical_expr;
+    use crate::utils::tests::TestScalarUDF;
     use crate::PhysicalSortExpr;
 
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow_schema::{SortOptions, TimeUnit};
-    use datafusion_common::Result;
-    use datafusion_expr::execution_props::ExecutionProps;
-    use datafusion_expr::{BuiltinScalarFunction, Operator};
-
-    use itertools::Itertools;
+    use super::*;
 
     #[test]
     fn project_orderings() -> Result<()> {
@@ -646,7 +649,7 @@ mod tests {
             col_b.clone(),
         )) as Arc<dyn PhysicalExpr>;
 
-        let round_c = &create_physical_expr(
+        let round_c = &crate::functions::create_physical_expr(
             &BuiltinScalarFunction::Round,
             &[col_c.clone()],
             &schema,
@@ -973,11 +976,13 @@ mod tests {
             let table_data_with_properties =
                 generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
             // Floor(a)
+            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
             let floor_a = create_physical_expr(
-                &BuiltinScalarFunction::Floor,
+                &test_fun,
                 &[col("a", &test_schema)?],
                 &test_schema,
-                &ExecutionProps::default(),
+                &[],
+                &DFSchema::empty(),
             )?;
             // a + b
             let a_plus_b = Arc::new(BinaryExpr::new(
@@ -1049,11 +1054,13 @@ mod tests {
             let table_data_with_properties =
                 generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
             // Floor(a)
+            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
             let floor_a = create_physical_expr(
-                &BuiltinScalarFunction::Floor,
+                &test_fun,
                 &[col("a", &test_schema)?],
                 &test_schema,
-                &ExecutionProps::default(),
+                &[],
+                &DFSchema::empty(),
             )?;
             // a + b
             let a_plus_b = Arc::new(BinaryExpr::new(
diff --git a/datafusion/physical-expr/src/equivalence/properties.rs 
b/datafusion/physical-expr/src/equivalence/properties.rs
index 7ce540b267..c14c88d6c6 100644
--- a/datafusion/physical-expr/src/equivalence/properties.rs
+++ b/datafusion/physical-expr/src/equivalence/properties.rs
@@ -18,7 +18,13 @@
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
-use super::ordering::collapse_lex_ordering;
+use arrow_schema::{SchemaRef, SortOptions};
+use indexmap::{IndexMap, IndexSet};
+use itertools::Itertools;
+
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{JoinSide, JoinType, Result};
+
 use crate::equivalence::{
     collapse_lex_req, EquivalenceGroup, OrderingEquivalenceClass, 
ProjectionMapping,
 };
@@ -30,12 +36,7 @@ use crate::{
     PhysicalSortRequirement,
 };
 
-use arrow_schema::{SchemaRef, SortOptions};
-use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
-use datafusion_common::{JoinSide, JoinType, Result};
-
-use indexmap::{IndexMap, IndexSet};
-use itertools::Itertools;
+use super::ordering::collapse_lex_ordering;
 
 /// A `EquivalenceProperties` object stores useful information related to a 
schema.
 /// Currently, it keeps track of:
@@ -1296,7 +1297,13 @@ mod tests {
     use std::ops::Not;
     use std::sync::Arc;
 
-    use super::*;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::{Fields, SortOptions, TimeUnit};
+    use itertools::Itertools;
+
+    use datafusion_common::{DFSchema, Result};
+    use datafusion_expr::{Operator, ScalarUDF};
+
     use crate::equivalence::add_offset_to_expr;
     use crate::equivalence::tests::{
         convert_to_orderings, convert_to_sort_exprs, convert_to_sort_reqs,
@@ -1304,16 +1311,10 @@ mod tests {
         generate_table_for_eq_properties, is_table_same_after_sort, 
output_schema,
     };
     use crate::expressions::{col, BinaryExpr, Column};
-    use crate::functions::create_physical_expr;
+    use crate::utils::tests::TestScalarUDF;
     use crate::PhysicalSortExpr;
 
-    use arrow::datatypes::{DataType, Field, Schema};
-    use arrow_schema::{Fields, SortOptions, TimeUnit};
-    use datafusion_common::Result;
-    use datafusion_expr::execution_props::ExecutionProps;
-    use datafusion_expr::{BuiltinScalarFunction, Operator};
-
-    use itertools::Itertools;
+    use super::*;
 
     #[test]
     fn project_equivalence_properties_test() -> Result<()> {
@@ -1792,11 +1793,13 @@ mod tests {
             let table_data_with_properties =
                 generate_table_for_eq_properties(&eq_properties, N_ELEMENTS, 
N_DISTINCT)?;
 
-            let floor_a = create_physical_expr(
-                &BuiltinScalarFunction::Floor,
+            let test_fun = ScalarUDF::new_from_impl(TestScalarUDF::new());
+            let floor_a = crate::udf::create_physical_expr(
+                &test_fun,
                 &[col("a", &test_schema)?],
                 &test_schema,
-                &ExecutionProps::default(),
+                &[],
+                &DFSchema::empty(),
             )?;
             let a_plus_b = Arc::new(BinaryExpr::new(
                 col("a", &test_schema)?,
diff --git a/datafusion/physical-expr/src/functions.rs 
b/datafusion/physical-expr/src/functions.rs
index 770d918432..79d69b273d 100644
--- a/datafusion/physical-expr/src/functions.rs
+++ b/datafusion/physical-expr/src/functions.rs
@@ -184,16 +184,9 @@ pub fn create_physical_fun(
         BuiltinScalarFunction::Factorial => {
             Arc::new(|args| 
make_scalar_function_inner(math_expressions::factorial)(args))
         }
-        BuiltinScalarFunction::Floor => Arc::new(math_expressions::floor),
-        BuiltinScalarFunction::Gcd => {
-            Arc::new(|args| 
make_scalar_function_inner(math_expressions::gcd)(args))
-        }
         BuiltinScalarFunction::Iszero => {
             Arc::new(|args| 
make_scalar_function_inner(math_expressions::iszero)(args))
         }
-        BuiltinScalarFunction::Lcm => {
-            Arc::new(|args| 
make_scalar_function_inner(math_expressions::lcm)(args))
-        }
         BuiltinScalarFunction::Nanvl => {
             Arc::new(|args| 
make_scalar_function_inner(math_expressions::nanvl)(args))
         }
@@ -204,7 +197,6 @@ pub fn create_physical_fun(
         BuiltinScalarFunction::Trunc => {
             Arc::new(|args| 
make_scalar_function_inner(math_expressions::trunc)(args))
         }
-        BuiltinScalarFunction::Pi => Arc::new(math_expressions::pi),
         BuiltinScalarFunction::Power => {
             Arc::new(|args| 
make_scalar_function_inner(math_expressions::power)(args))
         }
@@ -573,7 +565,7 @@ mod tests {
         let execution_props = ExecutionProps::new();
         let schema = Schema::new(vec![Field::new("a", DataType::Int32, 
false)]);
 
-        let funs = [BuiltinScalarFunction::Pi, BuiltinScalarFunction::Random];
+        let funs = [BuiltinScalarFunction::Random];
 
         for fun in funs.iter() {
             create_physical_expr_with_type_coercion(fun, &[], &schema, 
&execution_props)?;
diff --git a/datafusion/physical-expr/src/math_expressions.rs 
b/datafusion/physical-expr/src/math_expressions.rs
index f8244ad952..384f8d87eb 100644
--- a/datafusion/physical-expr/src/math_expressions.rs
+++ b/datafusion/physical-expr/src/math_expressions.rs
@@ -19,7 +19,6 @@
 
 use std::any::type_name;
 use std::iter;
-use std::mem::swap;
 use std::sync::Arc;
 
 use arrow::array::ArrayRef;
@@ -161,7 +160,6 @@ math_unary_function!("atan", atan);
 math_unary_function!("asinh", asinh);
 math_unary_function!("acosh", acosh);
 math_unary_function!("atanh", atanh);
-math_unary_function!("floor", floor);
 math_unary_function!("ceil", ceil);
 math_unary_function!("exp", exp);
 math_unary_function!("ln", ln);
@@ -181,79 +179,6 @@ pub fn factorial(args: &[ArrayRef]) -> Result<ArrayRef> {
     }
 }
 
-/// Computes greatest common divisor using Binary GCD algorithm.
-fn compute_gcd(x: i64, y: i64) -> i64 {
-    let mut a = x.wrapping_abs();
-    let mut b = y.wrapping_abs();
-
-    if a == 0 {
-        return b;
-    }
-    if b == 0 {
-        return a;
-    }
-
-    let shift = (a | b).trailing_zeros();
-    a >>= shift;
-    b >>= shift;
-    a >>= a.trailing_zeros();
-
-    loop {
-        b >>= b.trailing_zeros();
-        if a > b {
-            swap(&mut a, &mut b);
-        }
-
-        b -= a;
-
-        if b == 0 {
-            return a << shift;
-        }
-    }
-}
-
-/// Gcd SQL function
-pub fn gcd(args: &[ArrayRef]) -> Result<ArrayRef> {
-    match args[0].data_type() {
-        DataType::Int64 => Ok(Arc::new(make_function_inputs2!(
-            &args[0],
-            &args[1],
-            "x",
-            "y",
-            Int64Array,
-            Int64Array,
-            { compute_gcd }
-        )) as ArrayRef),
-        other => exec_err!("Unsupported data type {other:?} for function gcd"),
-    }
-}
-
-/// Lcm SQL function
-pub fn lcm(args: &[ArrayRef]) -> Result<ArrayRef> {
-    let compute_lcm = |x: i64, y: i64| {
-        let a = x.wrapping_abs();
-        let b = y.wrapping_abs();
-
-        if a == 0 || b == 0 {
-            return 0;
-        }
-        a / compute_gcd(a, b) * b
-    };
-
-    match args[0].data_type() {
-        DataType::Int64 => Ok(Arc::new(make_function_inputs2!(
-            &args[0],
-            &args[1],
-            "x",
-            "y",
-            Int64Array,
-            Int64Array,
-            { compute_lcm }
-        )) as ArrayRef),
-        other => exec_err!("Unsupported data type {other:?} for function lcm"),
-    }
-}
-
 /// Nanvl SQL function
 pub fn nanvl(args: &[ArrayRef]) -> Result<ArrayRef> {
     match args[0].data_type() {
@@ -345,15 +270,6 @@ pub fn iszero(args: &[ArrayRef]) -> Result<ArrayRef> {
     }
 }
 
-/// Pi SQL function
-pub fn pi(args: &[ColumnarValue]) -> Result<ColumnarValue> {
-    if !matches!(&args[0], ColumnarValue::Array(_)) {
-        return exec_err!("Expect pi function to take no param");
-    }
-    let array = Float64Array::from_value(std::f64::consts::PI, 1);
-    Ok(ColumnarValue::Array(Arc::new(array)))
-}
-
 /// Random SQL function
 pub fn random(args: &[ColumnarValue]) -> Result<ColumnarValue> {
     let len: usize = match &args[0] {
@@ -808,40 +724,6 @@ mod tests {
         assert_eq!(ints, &expected);
     }
 
-    #[test]
-    fn test_gcd_i64() {
-        let args: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
-            Arc::new(Int64Array::from(vec![0, -2, 15, 8])),  // y
-        ];
-
-        let result = gcd(&args).expect("failed to initialize function gcd");
-        let ints = as_int64_array(&result).expect("failed to initialize 
function gcd");
-
-        assert_eq!(ints.len(), 4);
-        assert_eq!(ints.value(0), 0);
-        assert_eq!(ints.value(1), 1);
-        assert_eq!(ints.value(2), 5);
-        assert_eq!(ints.value(3), 8);
-    }
-
-    #[test]
-    fn test_lcm_i64() {
-        let args: Vec<ArrayRef> = vec![
-            Arc::new(Int64Array::from(vec![0, 3, 25, -16])), // x
-            Arc::new(Int64Array::from(vec![0, -2, 15, 8])),  // y
-        ];
-
-        let result = lcm(&args).expect("failed to initialize function lcm");
-        let ints = as_int64_array(&result).expect("failed to initialize 
function lcm");
-
-        assert_eq!(ints.len(), 4);
-        assert_eq!(ints.value(0), 0);
-        assert_eq!(ints.value(1), 6);
-        assert_eq!(ints.value(2), 75);
-        assert_eq!(ints.value(3), 16);
-    }
-
     #[test]
     fn test_cot_f32() {
         let args: Vec<ArrayRef> =
diff --git a/datafusion/physical-expr/src/udf.rs 
b/datafusion/physical-expr/src/udf.rs
index 4fc94bfa15..368dfdf92f 100644
--- a/datafusion/physical-expr/src/udf.rs
+++ b/datafusion/physical-expr/src/udf.rs
@@ -16,14 +16,17 @@
 // under the License.
 
 //! UDF support
-use crate::{PhysicalExpr, ScalarFunctionExpr};
+use std::sync::Arc;
+
 use arrow_schema::Schema;
+
 use datafusion_common::{DFSchema, Result};
 pub use datafusion_expr::ScalarUDF;
 use datafusion_expr::{
     type_coercion::functions::data_types, Expr, ScalarFunctionDefinition,
 };
-use std::sync::Arc;
+
+use crate::{PhysicalExpr, ScalarFunctionExpr};
 
 /// Create a physical expression of the UDF.
 ///
@@ -60,58 +63,18 @@ pub fn create_physical_expr(
 
 #[cfg(test)]
 mod tests {
-    use arrow_schema::{DataType, Schema};
+    use arrow_schema::Schema;
+
     use datafusion_common::{DFSchema, Result};
-    use datafusion_expr::{
-        ColumnarValue, FuncMonotonicity, ScalarUDF, ScalarUDFImpl, Signature, 
Volatility,
-    };
+    use datafusion_expr::ScalarUDF;
 
+    use crate::utils::tests::TestScalarUDF;
     use crate::ScalarFunctionExpr;
 
     use super::create_physical_expr;
 
     #[test]
     fn test_functions() -> Result<()> {
-        #[derive(Debug, Clone)]
-        struct TestScalarUDF {
-            signature: Signature,
-        }
-
-        impl TestScalarUDF {
-            fn new() -> Self {
-                let signature =
-                    Signature::exact(vec![DataType::Float64], 
Volatility::Immutable);
-
-                Self { signature }
-            }
-        }
-
-        impl ScalarUDFImpl for TestScalarUDF {
-            fn as_any(&self) -> &dyn std::any::Any {
-                self
-            }
-
-            fn name(&self) -> &str {
-                "my_fn"
-            }
-
-            fn signature(&self) -> &Signature {
-                &self.signature
-            }
-
-            fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> 
{
-                Ok(DataType::Float64)
-            }
-
-            fn invoke(&self, _args: &[ColumnarValue]) -> Result<ColumnarValue> 
{
-                unimplemented!("my_fn is not implemented")
-            }
-
-            fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
-                Ok(Some(vec![Some(true)]))
-            }
-        }
-
         // create and register the udf
         let udf = ScalarUDF::from(TestScalarUDF::new());
 
diff --git a/datafusion/physical-expr/src/utils/mod.rs 
b/datafusion/physical-expr/src/utils/mod.rs
index e55bc3d156..d7bebbff89 100644
--- a/datafusion/physical-expr/src/utils/mod.rs
+++ b/datafusion/physical-expr/src/utils/mod.rs
@@ -256,7 +256,9 @@ pub fn merge_vectors(
 }
 
 #[cfg(test)]
-mod tests {
+pub(crate) mod tests {
+    use arrow_array::{ArrayRef, Float32Array, Float64Array};
+    use std::any::Any;
     use std::fmt::{Display, Formatter};
     use std::sync::Arc;
 
@@ -265,10 +267,103 @@ mod tests {
     use crate::PhysicalSortExpr;
 
     use arrow_schema::{DataType, Field, Schema};
-    use datafusion_common::{Result, ScalarValue};
+    use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
 
+    use datafusion_expr::{
+        ColumnarValue, FuncMonotonicity, ScalarUDFImpl, Signature, Volatility,
+    };
     use petgraph::visit::Bfs;
 
+    #[derive(Debug, Clone)]
+    pub struct TestScalarUDF {
+        signature: Signature,
+    }
+
+    impl TestScalarUDF {
+        pub fn new() -> Self {
+            use DataType::*;
+            Self {
+                signature: Signature::uniform(
+                    1,
+                    vec![Float64, Float32],
+                    Volatility::Immutable,
+                ),
+            }
+        }
+    }
+
+    impl ScalarUDFImpl for TestScalarUDF {
+        fn as_any(&self) -> &dyn Any {
+            self
+        }
+        fn name(&self) -> &str {
+            "test-scalar-udf"
+        }
+
+        fn signature(&self) -> &Signature {
+            &self.signature
+        }
+
+        fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
+            let arg_type = &arg_types[0];
+
+            match arg_type {
+                DataType::Float32 => Ok(DataType::Float32),
+                _ => Ok(DataType::Float64),
+            }
+        }
+
+        fn monotonicity(&self) -> Result<Option<FuncMonotonicity>> {
+            Ok(Some(vec![Some(true)]))
+        }
+
+        fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
+            let args = ColumnarValue::values_to_arrays(args)?;
+
+            let arr: ArrayRef = match args[0].data_type() {
+                DataType::Float64 => Arc::new({
+                    let arg = &args[0]
+                        .as_any()
+                        .downcast_ref::<Float64Array>()
+                        .ok_or_else(|| {
+                            DataFusionError::Internal(format!(
+                                "could not cast {} to {}",
+                                self.name(),
+                                std::any::type_name::<Float64Array>()
+                            ))
+                        })?;
+
+                    arg.iter()
+                        .map(|a| a.map(f64::floor))
+                        .collect::<Float64Array>()
+                }),
+                DataType::Float32 => Arc::new({
+                    let arg = &args[0]
+                        .as_any()
+                        .downcast_ref::<Float32Array>()
+                        .ok_or_else(|| {
+                            DataFusionError::Internal(format!(
+                                "could not cast {} to {}",
+                                self.name(),
+                                std::any::type_name::<Float32Array>()
+                            ))
+                        })?;
+
+                    arg.iter()
+                        .map(|a| a.map(f32::floor))
+                        .collect::<Float32Array>()
+                }),
+                other => {
+                    return exec_err!(
+                        "Unsupported data type {other:?} for function {}",
+                        self.name()
+                    );
+                }
+            };
+            Ok(ColumnarValue::Array(arr))
+        }
+    }
+
     #[derive(Clone)]
     struct DummyProperty {
         expr_type: String,
diff --git a/datafusion/proto/proto/datafusion.proto 
b/datafusion/proto/proto/datafusion.proto
index 7f967657f5..b656bededc 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -550,7 +550,7 @@ enum ScalarFunction {
   // 6 was Cos
   // 7 was Digest
   Exp = 8;
-  Floor = 9;
+  // 9 was Floor
   // 10 was Ln
   Log = 11;
   // 12 was Log10
@@ -621,12 +621,12 @@ enum ScalarFunction {
   // 77 was Sinh
   // 78 was Cosh
   // Tanh = 79
-  Pi = 80;
+  // 80 was Pi
   // 81 was Degrees
   // 82 was Radians
   Factorial = 83;
-  Lcm = 84;
-  Gcd = 85;
+  // 84 was Lcm
+  // 85 was Gcd
   // 86 was ArrayAppend
   // 87 was ArrayConcat
   // 88 was ArrayDims
diff --git a/datafusion/proto/src/generated/pbjson.rs 
b/datafusion/proto/src/generated/pbjson.rs
index 966d7f7f74..c13ae045bd 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -22794,7 +22794,6 @@ impl serde::Serialize for ScalarFunction {
             Self::Unknown => "unknown",
             Self::Ceil => "Ceil",
             Self::Exp => "Exp",
-            Self::Floor => "Floor",
             Self::Log => "Log",
             Self::Round => "Round",
             Self::Trunc => "Trunc",
@@ -22804,10 +22803,7 @@ impl serde::Serialize for ScalarFunction {
             Self::Random => "Random",
             Self::Coalesce => "Coalesce",
             Self::Power => "Power",
-            Self::Pi => "Pi",
             Self::Factorial => "Factorial",
-            Self::Lcm => "Lcm",
-            Self::Gcd => "Gcd",
             Self::Cot => "Cot",
             Self::Nanvl => "Nanvl",
             Self::Iszero => "Iszero",
@@ -22826,7 +22822,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
             "unknown",
             "Ceil",
             "Exp",
-            "Floor",
             "Log",
             "Round",
             "Trunc",
@@ -22836,10 +22831,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
             "Random",
             "Coalesce",
             "Power",
-            "Pi",
             "Factorial",
-            "Lcm",
-            "Gcd",
             "Cot",
             "Nanvl",
             "Iszero",
@@ -22887,7 +22879,6 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
                     "unknown" => Ok(ScalarFunction::Unknown),
                     "Ceil" => Ok(ScalarFunction::Ceil),
                     "Exp" => Ok(ScalarFunction::Exp),
-                    "Floor" => Ok(ScalarFunction::Floor),
                     "Log" => Ok(ScalarFunction::Log),
                     "Round" => Ok(ScalarFunction::Round),
                     "Trunc" => Ok(ScalarFunction::Trunc),
@@ -22897,10 +22888,7 @@ impl<'de> serde::Deserialize<'de> for ScalarFunction {
                     "Random" => Ok(ScalarFunction::Random),
                     "Coalesce" => Ok(ScalarFunction::Coalesce),
                     "Power" => Ok(ScalarFunction::Power),
-                    "Pi" => Ok(ScalarFunction::Pi),
                     "Factorial" => Ok(ScalarFunction::Factorial),
-                    "Lcm" => Ok(ScalarFunction::Lcm),
-                    "Gcd" => Ok(ScalarFunction::Gcd),
                     "Cot" => Ok(ScalarFunction::Cot),
                     "Nanvl" => Ok(ScalarFunction::Nanvl),
                     "Iszero" => Ok(ScalarFunction::Iszero),
diff --git a/datafusion/proto/src/generated/prost.rs 
b/datafusion/proto/src/generated/prost.rs
index c94aa1f4ed..092d5c59d0 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -2849,7 +2849,7 @@ pub enum ScalarFunction {
     /// 6 was Cos
     /// 7 was Digest
     Exp = 8,
-    Floor = 9,
+    /// 9 was Floor
     /// 10 was Ln
     Log = 11,
     /// 12 was Log10
@@ -2920,12 +2920,12 @@ pub enum ScalarFunction {
     /// 77 was Sinh
     /// 78 was Cosh
     /// Tanh = 79
-    Pi = 80,
+    /// 80 was Pi
     /// 81 was Degrees
     /// 82 was Radians
     Factorial = 83,
-    Lcm = 84,
-    Gcd = 85,
+    /// 84 was Lcm
+    /// 85 was Gcd
     /// 86 was ArrayAppend
     /// 87 was ArrayConcat
     /// 88 was ArrayDims
@@ -2989,7 +2989,6 @@ impl ScalarFunction {
             ScalarFunction::Unknown => "unknown",
             ScalarFunction::Ceil => "Ceil",
             ScalarFunction::Exp => "Exp",
-            ScalarFunction::Floor => "Floor",
             ScalarFunction::Log => "Log",
             ScalarFunction::Round => "Round",
             ScalarFunction::Trunc => "Trunc",
@@ -2999,10 +2998,7 @@ impl ScalarFunction {
             ScalarFunction::Random => "Random",
             ScalarFunction::Coalesce => "Coalesce",
             ScalarFunction::Power => "Power",
-            ScalarFunction::Pi => "Pi",
             ScalarFunction::Factorial => "Factorial",
-            ScalarFunction::Lcm => "Lcm",
-            ScalarFunction::Gcd => "Gcd",
             ScalarFunction::Cot => "Cot",
             ScalarFunction::Nanvl => "Nanvl",
             ScalarFunction::Iszero => "Iszero",
@@ -3015,7 +3011,6 @@ impl ScalarFunction {
             "unknown" => Some(Self::Unknown),
             "Ceil" => Some(Self::Ceil),
             "Exp" => Some(Self::Exp),
-            "Floor" => Some(Self::Floor),
             "Log" => Some(Self::Log),
             "Round" => Some(Self::Round),
             "Trunc" => Some(Self::Trunc),
@@ -3025,10 +3020,7 @@ impl ScalarFunction {
             "Random" => Some(Self::Random),
             "Coalesce" => Some(Self::Coalesce),
             "Power" => Some(Self::Power),
-            "Pi" => Some(Self::Pi),
             "Factorial" => Some(Self::Factorial),
-            "Lcm" => Some(Self::Lcm),
-            "Gcd" => Some(Self::Gcd),
             "Cot" => Some(Self::Cot),
             "Nanvl" => Some(Self::Nanvl),
             "Iszero" => Some(Self::Iszero),
diff --git a/datafusion/proto/src/logical_plan/from_proto.rs 
b/datafusion/proto/src/logical_plan/from_proto.rs
index 96b3b5942e..9c24a39418 100644
--- a/datafusion/proto/src/logical_plan/from_proto.rs
+++ b/datafusion/proto/src/logical_plan/from_proto.rs
@@ -39,9 +39,9 @@ use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_
 use datafusion_expr::{
     ceil, coalesce, concat_expr, concat_ws_expr, cot, ends_with, exp,
     expr::{self, InList, Sort, WindowFunction},
-    factorial, floor, gcd, initcap, iszero, lcm, log,
+    factorial, initcap, iszero, log,
     logical_plan::{PlanType, StringifiedPlan},
-    nanvl, pi, power, random, round, trunc, AggregateFunction, Between, 
BinaryExpr,
+    nanvl, power, random, round, trunc, AggregateFunction, Between, BinaryExpr,
     BuiltInWindowFunction, BuiltinScalarFunction, Case, Cast, Expr, 
GetFieldAccess,
     GetIndexedField, GroupingSet,
     GroupingSet::GroupingSets,
@@ -423,9 +423,6 @@ impl From<&protobuf::ScalarFunction> for 
BuiltinScalarFunction {
             ScalarFunction::Exp => Self::Exp,
             ScalarFunction::Log => Self::Log,
             ScalarFunction::Factorial => Self::Factorial,
-            ScalarFunction::Gcd => Self::Gcd,
-            ScalarFunction::Lcm => Self::Lcm,
-            ScalarFunction::Floor => Self::Floor,
             ScalarFunction::Ceil => Self::Ceil,
             ScalarFunction::Round => Self::Round,
             ScalarFunction::Trunc => Self::Trunc,
@@ -435,7 +432,6 @@ impl From<&protobuf::ScalarFunction> for 
BuiltinScalarFunction {
             ScalarFunction::InitCap => Self::InitCap,
             ScalarFunction::Random => Self::Random,
             ScalarFunction::Coalesce => Self::Coalesce,
-            ScalarFunction::Pi => Self::Pi,
             ScalarFunction::Power => Self::Power,
             ScalarFunction::Nanvl => Self::Nanvl,
             ScalarFunction::Iszero => Self::Iszero,
@@ -1301,9 +1297,6 @@ pub fn parse_expr(
             match scalar_function {
                 ScalarFunction::Unknown => Err(proto_error("Unknown scalar 
function")),
                 ScalarFunction::Exp => Ok(exp(parse_expr(&args[0], registry, 
codec)?)),
-                ScalarFunction::Floor => {
-                    Ok(floor(parse_expr(&args[0], registry, codec)?))
-                }
                 ScalarFunction::Factorial => {
                     Ok(factorial(parse_expr(&args[0], registry, codec)?))
                 }
@@ -1313,14 +1306,6 @@ pub fn parse_expr(
                 ScalarFunction::InitCap => {
                     Ok(initcap(parse_expr(&args[0], registry, codec)?))
                 }
-                ScalarFunction::Gcd => Ok(gcd(
-                    parse_expr(&args[0], registry, codec)?,
-                    parse_expr(&args[1], registry, codec)?,
-                )),
-                ScalarFunction::Lcm => Ok(lcm(
-                    parse_expr(&args[0], registry, codec)?,
-                    parse_expr(&args[1], registry, codec)?,
-                )),
                 ScalarFunction::Random => Ok(random()),
                 ScalarFunction::Concat => {
                     Ok(concat_expr(parse_exprs(args, registry, codec)?))
@@ -1335,7 +1320,6 @@ pub fn parse_expr(
                 ScalarFunction::Coalesce => {
                     Ok(coalesce(parse_exprs(args, registry, codec)?))
                 }
-                ScalarFunction::Pi => Ok(pi()),
                 ScalarFunction::Power => Ok(power(
                     parse_expr(&args[0], registry, codec)?,
                     parse_expr(&args[1], registry, codec)?,
diff --git a/datafusion/proto/src/logical_plan/to_proto.rs 
b/datafusion/proto/src/logical_plan/to_proto.rs
index a10edb3932..bd964b43d4 100644
--- a/datafusion/proto/src/logical_plan/to_proto.rs
+++ b/datafusion/proto/src/logical_plan/to_proto.rs
@@ -1410,10 +1410,7 @@ impl TryFrom<&BuiltinScalarFunction> for 
protobuf::ScalarFunction {
             BuiltinScalarFunction::Cot => Self::Cot,
             BuiltinScalarFunction::Exp => Self::Exp,
             BuiltinScalarFunction::Factorial => Self::Factorial,
-            BuiltinScalarFunction::Gcd => Self::Gcd,
-            BuiltinScalarFunction::Lcm => Self::Lcm,
             BuiltinScalarFunction::Log => Self::Log,
-            BuiltinScalarFunction::Floor => Self::Floor,
             BuiltinScalarFunction::Ceil => Self::Ceil,
             BuiltinScalarFunction::Round => Self::Round,
             BuiltinScalarFunction::Trunc => Self::Trunc,
@@ -1423,7 +1420,6 @@ impl TryFrom<&BuiltinScalarFunction> for 
protobuf::ScalarFunction {
             BuiltinScalarFunction::InitCap => Self::InitCap,
             BuiltinScalarFunction::Random => Self::Random,
             BuiltinScalarFunction::Coalesce => Self::Coalesce,
-            BuiltinScalarFunction::Pi => Self::Pi,
             BuiltinScalarFunction::Power => Self::Power,
             BuiltinScalarFunction::Nanvl => Self::Nanvl,
             BuiltinScalarFunction::Iszero => Self::Iszero,
diff --git a/datafusion/sql/src/expr/function.rs 
b/datafusion/sql/src/expr/function.rs
index e97eb1a32b..4bf0906685 100644
--- a/datafusion/sql/src/expr/function.rs
+++ b/datafusion/sql/src/expr/function.rs
@@ -18,7 +18,8 @@
 use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
 use arrow_schema::DataType;
 use datafusion_common::{
-    not_impl_err, plan_datafusion_err, plan_err, DFSchema, Dependency, Result,
+    internal_datafusion_err, not_impl_err, plan_datafusion_err, plan_err, 
DFSchema,
+    Dependency, Result,
 };
 use datafusion_expr::window_frame::{check_window_frame, 
regularize_window_order_by};
 use datafusion_expr::{
@@ -264,6 +265,23 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         plan_err!("Invalid function '{name}'.\nDid you mean 
'{suggested_func_name}'?")
     }
 
+    pub(super) fn sql_fn_name_to_expr(
+        &self,
+        expr: SQLExpr,
+        fn_name: &str,
+        schema: &DFSchema,
+        planner_context: &mut PlannerContext,
+    ) -> Result<Expr> {
+        let fun = self
+            .context_provider
+            .get_function_meta(fn_name)
+            .ok_or_else(|| {
+                internal_datafusion_err!("Unable to find expected '{fn_name}' 
function")
+            })?;
+        let args = vec![self.sql_expr_to_logical_expr(expr, schema, 
planner_context)?];
+        Ok(Expr::ScalarFunction(ScalarFunction::new_udf(fun, args)))
+    }
+
     pub(super) fn sql_named_function_to_expr(
         &self,
         expr: SQLExpr,
diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs
index c2f72720af..7763fa2d8d 100644
--- a/datafusion/sql/src/expr/mod.rs
+++ b/datafusion/sql/src/expr/mod.rs
@@ -518,12 +518,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             SQLExpr::Floor {
                 expr,
                 field: _field,
-            } => self.sql_named_function_to_expr(
-                *expr,
-                BuiltinScalarFunction::Floor,
-                schema,
-                planner_context,
-            ),
+            } => self.sql_fn_name_to_expr(*expr, "floor", schema, 
planner_context),
             SQLExpr::Ceil {
                 expr,
                 field: _field,

Reply via email to