alamb commented on code in PR #9333:
URL: https://github.com/apache/arrow-datafusion/pull/9333#discussion_r1509945936


##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
     Ok(())
 }
 
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+    pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+    #[doc = r" Crates and registers a function from [CreateFunction] 
statement"]
+    #[must_use]
+    #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+    async fn create(
+        &self,
+        _config: &SessionConfig,
+        statement: CreateFunction,
+    ) -> datafusion::error::Result<RegisterFunction> {
+        // this function is a mock for testing
+        // `CreateFunction` should be used to derive this function
+
+        let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+            let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+            let base =
+                
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+            let exponent =
+                
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+            let array = base
+                .iter()
+                .zip(exponent.iter())
+                .map(|(base, exponent)| match (base, exponent) {
+                    (Some(base), Some(exponent)) => 
Some(base.add_wrapping(exponent)),
+                    _ => None,
+                })
+                .collect::<arrow_array::Float64Array>();
+            Ok(datafusion_expr::ColumnarValue::from(
+                Arc::new(array) as arrow_array::ArrayRef
+            ))
+        });
+
+        let args = statement.args.unwrap();
+        let mock_udf = create_udf(
+            &statement.name,
+            vec![args[0].data_type.clone(), args[1].data_type.clone()],
+            Arc::new(statement.return_type.unwrap()),
+            datafusion_expr::Volatility::Immutable,
+            mock_add,
+        );
+
+        // capture expression so we can verify
+        // it has been parsed
+        *self.captured_expr.lock() = statement.params.return_;
+
+        Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+    }
+}
+
+#[tokio::test]
+async fn create_scalar_function_from_sql_statement() -> Result<()> {
+    let function_factory = Arc::new(MockFunctionFactory::default());
+    let runtime_config = RuntimeConfig::new();
+    let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
+    let session_config =
+        SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"PostgreSQL");
+
+    let state =
+        SessionState::new_with_config_rt(session_config, 
Arc::new(runtime_environment))
+            .with_function_factory(function_factory.clone());
+
+    let ctx = SessionContext::new_with_state(state);
+
+    let sql = r#"
+    CREATE FUNCTION better_add(DOUBLE, DOUBLE)
+        RETURNS DOUBLE
+        RETURN $1 + $2
+    "#;
+    let _ = ctx.sql(sql).await?;
+
+    ctx.sql("select better_add(2.0, 2.0)").await?.show().await?;
+
+    // check if we sql expr has been converted to datafusion expr
+    let captured_expression = 
function_factory.captured_expr.lock().clone().unwrap();
+
+    // is there some better way to test this

Review Comment:
   I think this is fine



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+    // TODO: There is open question should we expose sqlparser types or 
redefine them here?
+    //       At the moment it make more sense to expose sqlparser types and 
leave
+    //       user to convert them as needed
+    pub or_replace: bool,
+    pub temporary: bool,
+    pub name: String,
+    pub args: Option<Vec<OperateFunctionArg>>,
+    pub return_type: Option<DataType>,
+    // TODO: move this to new struct here
+    pub params: CreateFunctionBody,
+    //pub body: String,
+    /// Dummy schema
+    pub schema: DFSchemaRef,

Review Comment:
   Why is this needed?



##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
     Ok(())
 }
 
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+    pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+    #[doc = r" Crates and registers a function from [CreateFunction] 
statement"]
+    #[must_use]
+    #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+    async fn create(
+        &self,
+        _config: &SessionConfig,
+        statement: CreateFunction,
+    ) -> datafusion::error::Result<RegisterFunction> {
+        // this function is a mock for testing
+        // `CreateFunction` should be used to derive this function
+

Review Comment:
   ```suggestion
           // In this example, we always create a function that adds its 
arguments
           // with the name specified in `CREATE FUNCTION`. In a real 
implementation
           // the body of the created UDF would also likely be a function of 
the contents
           // of the `CreateFunction`
   ```



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+    // TODO: There is open question should we expose sqlparser types or 
redefine them here?
+    //       At the moment it make more sense to expose sqlparser types and 
leave
+    //       user to convert them as needed
+    pub or_replace: bool,
+    pub temporary: bool,
+    pub name: String,
+    pub args: Option<Vec<OperateFunctionArg>>,
+    pub return_type: Option<DataType>,
+    // TODO: move this to new struct here
+    pub params: CreateFunctionBody,
+    //pub body: String,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct OperateFunctionArg {
+    // it is not really supported so no need to have it here
+    // currently
+    // pub mode: Option<ArgMode>,
+    pub name: Option<Ident>,
+    pub data_type: DataType,
+    pub default_expr: Option<Expr>,
+}
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunctionBody {
+    /// LANGUAGE lang_name
+    pub language: Option<Ident>,
+    /// IMMUTABLE | STABLE | VOLATILE
+    pub behavior: Option<Volatility>,
+    /// AS 'definition'
+    pub as_: Option<DefinitionStatement>,
+    /// RETURN expression
+    pub return_: Option<Expr>,
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum DefinitionStatement {
+    SingleQuotedDef(String),
+    DoubleDollarDef(String),
+}
+
+impl From<sqlparser::ast::FunctionDefinition> for DefinitionStatement {
+    fn from(value: sqlparser::ast::FunctionDefinition) -> Self {
+        match value {
+            sqlparser::ast::FunctionDefinition::SingleQuotedDef(s) => {
+                Self::SingleQuotedDef(s)
+            }
+            sqlparser::ast::FunctionDefinition::DoubleDollarDef(s) => {
+                Self::DoubleDollarDef(s)
+            }
+        }
+    }
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct DropFunction {
+    pub name: String,
+    pub if_exists: bool,
+    pub schema: DFSchemaRef,

Review Comment:
   Does this really need a schema?



##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1568,6 +1650,16 @@ impl SessionState {
         self
     }
 
+    /// Registers `CREATE FUNCTION` statement handler implementing
+    /// [`FunctionFactory`] trait.

Review Comment:
   ```suggestion
       /// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION` 
statements
   ```



##########
datafusion/sql/src/statement.rs:
##########
@@ -625,8 +626,136 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 });
                 Ok(LogicalPlan::Statement(statement))
             }
+            Statement::CreateFunction {
+                or_replace,
+                temporary,
+                name,
+                args,
+                return_type,
+                params,
+            } => {
+                let return_type = match return_type {
+                    Some(t) => Some(self.convert_data_type(&t)?),
+                    None => None,
+                };
+                let mut planner_context = PlannerContext::new();
+                let empty_schema = &DFSchema::empty();
+
+                let args = match args {
+                    Some(function_args) => {
+                        let function_args = function_args
+                            .into_iter()
+                            .map(|arg| {
+                                let data_type = 
self.convert_data_type(&arg.data_type)?;
+
+                                let default_expr = match arg.default_expr {
+                                    Some(expr) => Some(self.sql_to_expr(
+                                        expr,
+                                        empty_schema,
+                                        &mut planner_context,
+                                    )?),
+                                    None => None,
+                                };
+                                Ok(OperateFunctionArg {
+                                    name: arg.name,
+                                    default_expr,
+                                    data_type,
+                                })
+                            })
+                            .collect::<Result<Vec<OperateFunctionArg>>>();
+                        Some(function_args?)
+                    }
+                    None => None,
+                };
+
+                // Not sure if this is correct way to generate name
+                // postgresql function definition may have schema part as well
+                // datafusion at the moment does lookup based on given string
+                // `schema_name.function_name` will work even if there is no 
`schema_name`
+                let name: String = name
+                    .0
+                    .into_iter()
+                    .map(|i| i.value)
+                    .collect::<Vec<String>>()
+                    .join(".");

Review Comment:
   
   I recommend in this case that we throw an error if someone tries to create a 
qualified function, and if it is important to support qualified function names 
in the future we can revisit the issue then
   
   So
   ```
   CREATE FUNCTION foo
   ```
   would work
   
   but
   ```
   CREATE FUNCTION my.foo ... 
   ```
   
   would error (not yet implemented error)
   
   
   
   This is the corresponding code for table references 
https://github.com/apache/arrow-datafusion/blob/dd4263f843e093c807d63edf73a571b1ba2669b5/datafusion/common/src/column.rs#L71-L99
   
    is 
https://docs.rs/datafusion/latest/datafusion/sql/enum.TableReference.html 
   
   But at the moment DataFusion doesn't put functions in a schema or catalog, 
it simply puts them some global namespace.
   



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {

Review Comment:
   ```suggestion
   /// Arguments passed to `CREATE FUNCTION`
   ///
   /// Note this meant to be the same as from sqlparser's 
[`sqlparser::ast::Statement::CreateFunction`]
   #[derive(Clone, PartialEq, Eq, Hash, Debug)]
   pub struct CreateFunction {
   ```



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+    // TODO: There is open question should we expose sqlparser types or 
redefine them here?

Review Comment:
   Is this still TODO (or are the rest of the comments below todo)? I think 
this structure makes sense and with a little more explanation on rationale (aka 
the Statement::CreateFunction is a variant and not its own struct) this make 
sense
   
   The rationale I think would be important so that when we upgrade sqlparser's 
version and it adds more CREATE FUNCTION variants, the updater knows the 
desired results in DataFusion



##########
datafusion/sql/src/statement.rs:
##########
@@ -625,8 +626,136 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                 });
                 Ok(LogicalPlan::Statement(statement))
             }
+            Statement::CreateFunction {
+                or_replace,
+                temporary,
+                name,
+                args,
+                return_type,
+                params,
+            } => {
+                let return_type = match return_type {
+                    Some(t) => Some(self.convert_data_type(&t)?),
+                    None => None,
+                };
+                let mut planner_context = PlannerContext::new();
+                let empty_schema = &DFSchema::empty();
+
+                let args = match args {
+                    Some(function_args) => {
+                        let function_args = function_args
+                            .into_iter()
+                            .map(|arg| {
+                                let data_type = 
self.convert_data_type(&arg.data_type)?;
+
+                                let default_expr = match arg.default_expr {
+                                    Some(expr) => Some(self.sql_to_expr(
+                                        expr,
+                                        empty_schema,
+                                        &mut planner_context,
+                                    )?),
+                                    None => None,
+                                };
+                                Ok(OperateFunctionArg {
+                                    name: arg.name,
+                                    default_expr,
+                                    data_type,
+                                })
+                            })
+                            .collect::<Result<Vec<OperateFunctionArg>>>();
+                        Some(function_args?)
+                    }
+                    None => None,
+                };
+
+                // Not sure if this is correct way to generate name
+                // postgresql function definition may have schema part as well
+                // datafusion at the moment does lookup based on given string
+                // `schema_name.function_name` will work even if there is no 
`schema_name`
+                let name: String = name
+                    .0
+                    .into_iter()
+                    .map(|i| i.value)
+                    .collect::<Vec<String>>()
+                    .join(".");
+
+                //
+                // convert resulting expression to data fusion expression
+                //
+                let arg_types = args.as_ref().map(|arg| {
+                    arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
+                });
+                let mut planner_context = PlannerContext::new()
+                    
.with_prepare_param_data_types(arg_types.unwrap_or_default());
+
+                let result_expression = match params.return_ {
+                    Some(r) => Some(self.sql_to_expr(
+                        r,
+                        &DFSchema::empty(),
+                        &mut planner_context,
+                    )?),
+                    None => None,
+                };
+
+                let params = CreateFunctionBody {
+                    language: params.language,
+                    behavior: params.behavior.map(|b| match b {
+                        ast::FunctionBehavior::Immutable => 
Volatility::Immutable,
+                        ast::FunctionBehavior::Stable => Volatility::Stable,
+                        ast::FunctionBehavior::Volatile => 
Volatility::Volatile,
+                    }),
+                    as_: params.as_.map(|m| m.into()),
+                    return_: result_expression,
+                };
 
-            _ => not_impl_err!("Unsupported SQL statement: {sql:?}"),
+                let statement = DdlStatement::CreateFunction(CreateFunction {
+                    or_replace,
+                    temporary,
+                    name,
+                    return_type,
+                    args,
+                    params,
+                    schema: DFSchemaRef::new(DFSchema::empty()),
+                });
+
+                Ok(LogicalPlan::Ddl(statement))
+            }
+            Statement::DropFunction {
+                if_exists,
+                func_desc,
+                ..
+            } => {
+                // Not sure if this is correct way to generate name

Review Comment:
   See above -- I also recommend erroring in the case of multi part 
identifiers. 



##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
     Ok(())
 }
 
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+    pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+    #[doc = r" Crates and registers a function from [CreateFunction] 
statement"]
+    #[must_use]
+    #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+    async fn create(
+        &self,
+        _config: &SessionConfig,
+        statement: CreateFunction,
+    ) -> datafusion::error::Result<RegisterFunction> {
+        // this function is a mock for testing
+        // `CreateFunction` should be used to derive this function
+
+        let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+            let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+            let base =
+                
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+            let exponent =
+                
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+            let array = base
+                .iter()
+                .zip(exponent.iter())
+                .map(|(base, exponent)| match (base, exponent) {
+                    (Some(base), Some(exponent)) => 
Some(base.add_wrapping(exponent)),
+                    _ => None,
+                })
+                .collect::<arrow_array::Float64Array>();
+            Ok(datafusion_expr::ColumnarValue::from(
+                Arc::new(array) as arrow_array::ArrayRef
+            ))
+        });
+
+        let args = statement.args.unwrap();
+        let mock_udf = create_udf(
+            &statement.name,
+            vec![args[0].data_type.clone(), args[1].data_type.clone()],
+            Arc::new(statement.return_type.unwrap()),
+            datafusion_expr::Volatility::Immutable,
+            mock_add,
+        );
+
+        // capture expression so we can verify
+        // it has been parsed
+        *self.captured_expr.lock() = statement.params.return_;
+
+        Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+    }
+}
+
+#[tokio::test]
+async fn create_scalar_function_from_sql_statement() -> Result<()> {
+    let function_factory = Arc::new(MockFunctionFactory::default());
+    let runtime_config = RuntimeConfig::new();
+    let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
+    let session_config =
+        SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"PostgreSQL");
+
+    let state =
+        SessionState::new_with_config_rt(session_config, 
Arc::new(runtime_environment))
+            .with_function_factory(function_factory.clone());
+
+    let ctx = SessionContext::new_with_state(state);
+
+    let sql = r#"
+    CREATE FUNCTION better_add(DOUBLE, DOUBLE)
+        RETURNS DOUBLE
+        RETURN $1 + $2
+    "#;
+    let _ = ctx.sql(sql).await?;

Review Comment:
   Create the `better_add` function dynamically via `CREATE FUNCTION` statement



##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
     Ok(())
 }
 
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+    pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+    #[doc = r" Crates and registers a function from [CreateFunction] 
statement"]
+    #[must_use]
+    #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+    async fn create(
+        &self,
+        _config: &SessionConfig,
+        statement: CreateFunction,
+    ) -> datafusion::error::Result<RegisterFunction> {
+        // this function is a mock for testing
+        // `CreateFunction` should be used to derive this function
+
+        let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+            let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+            let base =
+                
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+            let exponent =
+                
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+            let array = base
+                .iter()
+                .zip(exponent.iter())
+                .map(|(base, exponent)| match (base, exponent) {
+                    (Some(base), Some(exponent)) => 
Some(base.add_wrapping(exponent)),
+                    _ => None,
+                })
+                .collect::<arrow_array::Float64Array>();
+            Ok(datafusion_expr::ColumnarValue::from(
+                Arc::new(array) as arrow_array::ArrayRef
+            ))
+        });
+
+        let args = statement.args.unwrap();
+        let mock_udf = create_udf(
+            &statement.name,
+            vec![args[0].data_type.clone(), args[1].data_type.clone()],
+            Arc::new(statement.return_type.unwrap()),
+            datafusion_expr::Volatility::Immutable,
+            mock_add,
+        );
+
+        // capture expression so we can verify
+        // it has been parsed
+        *self.captured_expr.lock() = statement.params.return_;
+
+        Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+    }
+}
+
+#[tokio::test]
+async fn create_scalar_function_from_sql_statement() -> Result<()> {
+    let function_factory = Arc::new(MockFunctionFactory::default());
+    let runtime_config = RuntimeConfig::new();
+    let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
+    let session_config =
+        SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"PostgreSQL");
+
+    let state =
+        SessionState::new_with_config_rt(session_config, 
Arc::new(runtime_environment))
+            .with_function_factory(function_factory.clone());
+
+    let ctx = SessionContext::new_with_state(state);
+
+    let sql = r#"
+    CREATE FUNCTION better_add(DOUBLE, DOUBLE)
+        RETURNS DOUBLE
+        RETURN $1 + $2
+    "#;
+    let _ = ctx.sql(sql).await?;
+
+    ctx.sql("select better_add(2.0, 2.0)").await?.show().await?;
+
+    // check if we sql expr has been converted to datafusion expr
+    let captured_expression = 
function_factory.captured_expr.lock().clone().unwrap();
+
+    // is there some better way to test this
+    assert_eq!("$1 + $2", captured_expression.to_string());
+
+    // statement drops  function
+    assert!(ctx.sql("drop function better_add").await.is_ok());
+    // no function, it panics
+    assert!(ctx.sql("drop function better_add").await.is_err());
+    // no function, it dies not care
+    assert!(ctx.sql("drop function if exists better_add").await.is_ok());
+    // query should fail as there is no function
+    assert!(ctx.sql("select better_add(2.0, 2.0)").await.is_err());
+

Review Comment:
   Would it also be possible to add a test using 
`SessionContext::sql_with_options` showing that if DDL is disabled that CREATE 
/ DROP function is also disabled?
   
   
   
https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options



##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+    // TODO: There is open question should we expose sqlparser types or 
redefine them here?
+    //       At the moment it make more sense to expose sqlparser types and 
leave
+    //       user to convert them as needed
+    pub or_replace: bool,
+    pub temporary: bool,
+    pub name: String,
+    pub args: Option<Vec<OperateFunctionArg>>,
+    pub return_type: Option<DataType>,
+    // TODO: move this to new struct here
+    pub params: CreateFunctionBody,
+    //pub body: String,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct OperateFunctionArg {
+    // it is not really supported so no need to have it here
+    // currently

Review Comment:
   I think it is fine to leave some of these fields as "TODO" items and if 
someone needs them they can add them as a follow on PR
   ```suggestion
       // TODO: figure out how to support mode
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to