alamb commented on code in PR #9333:
URL: https://github.com/apache/arrow-datafusion/pull/9333#discussion_r1509945936
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
Ok(())
}
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+ pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+ #[doc = r" Crates and registers a function from [CreateFunction]
statement"]
+ #[must_use]
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ async fn create(
+ &self,
+ _config: &SessionConfig,
+ statement: CreateFunction,
+ ) -> datafusion::error::Result<RegisterFunction> {
+ // this function is a mock for testing
+ // `CreateFunction` should be used to derive this function
+
+ let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+ let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+ let base =
+
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+ let exponent =
+
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+ let array = base
+ .iter()
+ .zip(exponent.iter())
+ .map(|(base, exponent)| match (base, exponent) {
+ (Some(base), Some(exponent)) =>
Some(base.add_wrapping(exponent)),
+ _ => None,
+ })
+ .collect::<arrow_array::Float64Array>();
+ Ok(datafusion_expr::ColumnarValue::from(
+ Arc::new(array) as arrow_array::ArrayRef
+ ))
+ });
+
+ let args = statement.args.unwrap();
+ let mock_udf = create_udf(
+ &statement.name,
+ vec![args[0].data_type.clone(), args[1].data_type.clone()],
+ Arc::new(statement.return_type.unwrap()),
+ datafusion_expr::Volatility::Immutable,
+ mock_add,
+ );
+
+ // capture expression so we can verify
+ // it has been parsed
+ *self.captured_expr.lock() = statement.params.return_;
+
+ Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+ }
+}
+
+#[tokio::test]
+async fn create_scalar_function_from_sql_statement() -> Result<()> {
+ let function_factory = Arc::new(MockFunctionFactory::default());
+ let runtime_config = RuntimeConfig::new();
+ let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
+ let session_config =
+ SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"PostgreSQL");
+
+ let state =
+ SessionState::new_with_config_rt(session_config,
Arc::new(runtime_environment))
+ .with_function_factory(function_factory.clone());
+
+ let ctx = SessionContext::new_with_state(state);
+
+ let sql = r#"
+ CREATE FUNCTION better_add(DOUBLE, DOUBLE)
+ RETURNS DOUBLE
+ RETURN $1 + $2
+ "#;
+ let _ = ctx.sql(sql).await?;
+
+ ctx.sql("select better_add(2.0, 2.0)").await?.show().await?;
+
+ // check if we sql expr has been converted to datafusion expr
+ let captured_expression =
function_factory.captured_expr.lock().clone().unwrap();
+
+ // is there some better way to test this
Review Comment:
I think this is fine
##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+ // TODO: There is open question should we expose sqlparser types or
redefine them here?
+ // At the moment it make more sense to expose sqlparser types and
leave
+ // user to convert them as needed
+ pub or_replace: bool,
+ pub temporary: bool,
+ pub name: String,
+ pub args: Option<Vec<OperateFunctionArg>>,
+ pub return_type: Option<DataType>,
+ // TODO: move this to new struct here
+ pub params: CreateFunctionBody,
+ //pub body: String,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
Review Comment:
Why is this needed?
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
Ok(())
}
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+ pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+ #[doc = r" Crates and registers a function from [CreateFunction]
statement"]
+ #[must_use]
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ async fn create(
+ &self,
+ _config: &SessionConfig,
+ statement: CreateFunction,
+ ) -> datafusion::error::Result<RegisterFunction> {
+ // this function is a mock for testing
+ // `CreateFunction` should be used to derive this function
+
Review Comment:
```suggestion
// In this example, we always create a function that adds its
arguments
// with the name specified in `CREATE FUNCTION`. In a real
implementation
// the body of the created UDF would also likely be a function of
the contents
// of the `CreateFunction`
```
##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+ // TODO: There is open question should we expose sqlparser types or
redefine them here?
+ // At the moment it make more sense to expose sqlparser types and
leave
+ // user to convert them as needed
+ pub or_replace: bool,
+ pub temporary: bool,
+ pub name: String,
+ pub args: Option<Vec<OperateFunctionArg>>,
+ pub return_type: Option<DataType>,
+ // TODO: move this to new struct here
+ pub params: CreateFunctionBody,
+ //pub body: String,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct OperateFunctionArg {
+ // it is not really supported so no need to have it here
+ // currently
+ // pub mode: Option<ArgMode>,
+ pub name: Option<Ident>,
+ pub data_type: DataType,
+ pub default_expr: Option<Expr>,
+}
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunctionBody {
+ /// LANGUAGE lang_name
+ pub language: Option<Ident>,
+ /// IMMUTABLE | STABLE | VOLATILE
+ pub behavior: Option<Volatility>,
+ /// AS 'definition'
+ pub as_: Option<DefinitionStatement>,
+ /// RETURN expression
+ pub return_: Option<Expr>,
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub enum DefinitionStatement {
+ SingleQuotedDef(String),
+ DoubleDollarDef(String),
+}
+
+impl From<sqlparser::ast::FunctionDefinition> for DefinitionStatement {
+ fn from(value: sqlparser::ast::FunctionDefinition) -> Self {
+ match value {
+ sqlparser::ast::FunctionDefinition::SingleQuotedDef(s) => {
+ Self::SingleQuotedDef(s)
+ }
+ sqlparser::ast::FunctionDefinition::DoubleDollarDef(s) => {
+ Self::DoubleDollarDef(s)
+ }
+ }
+ }
+}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct DropFunction {
+ pub name: String,
+ pub if_exists: bool,
+ pub schema: DFSchemaRef,
Review Comment:
Does this really need a schema?
##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1568,6 +1650,16 @@ impl SessionState {
self
}
+ /// Registers `CREATE FUNCTION` statement handler implementing
+ /// [`FunctionFactory`] trait.
Review Comment:
```suggestion
/// Registers a [`FunctionFactory`] to handle `CREATE FUNCTION`
statements
```
##########
datafusion/sql/src/statement.rs:
##########
@@ -625,8 +626,136 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
});
Ok(LogicalPlan::Statement(statement))
}
+ Statement::CreateFunction {
+ or_replace,
+ temporary,
+ name,
+ args,
+ return_type,
+ params,
+ } => {
+ let return_type = match return_type {
+ Some(t) => Some(self.convert_data_type(&t)?),
+ None => None,
+ };
+ let mut planner_context = PlannerContext::new();
+ let empty_schema = &DFSchema::empty();
+
+ let args = match args {
+ Some(function_args) => {
+ let function_args = function_args
+ .into_iter()
+ .map(|arg| {
+ let data_type =
self.convert_data_type(&arg.data_type)?;
+
+ let default_expr = match arg.default_expr {
+ Some(expr) => Some(self.sql_to_expr(
+ expr,
+ empty_schema,
+ &mut planner_context,
+ )?),
+ None => None,
+ };
+ Ok(OperateFunctionArg {
+ name: arg.name,
+ default_expr,
+ data_type,
+ })
+ })
+ .collect::<Result<Vec<OperateFunctionArg>>>();
+ Some(function_args?)
+ }
+ None => None,
+ };
+
+ // Not sure if this is correct way to generate name
+ // postgresql function definition may have schema part as well
+ // datafusion at the moment does lookup based on given string
+ // `schema_name.function_name` will work even if there is no
`schema_name`
+ let name: String = name
+ .0
+ .into_iter()
+ .map(|i| i.value)
+ .collect::<Vec<String>>()
+ .join(".");
Review Comment:
I recommend in this case that we throw an error if someone tries to create a
qualified function, and if it is important to support qualified function names
in the future we can revisit the issue then
So
```
CREATE FUNCTION foo
```
would work
but
```
CREATE FUNCTION my.foo ...
```
would error (not yet implemented error)
This is the corresponding code for table references
https://github.com/apache/arrow-datafusion/blob/dd4263f843e093c807d63edf73a571b1ba2669b5/datafusion/common/src/column.rs#L71-L99
is
https://docs.rs/datafusion/latest/datafusion/sql/enum.TableReference.html
But at the moment DataFusion doesn't put functions in a schema or catalog,
it simply puts them some global namespace.
##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
Review Comment:
```suggestion
/// Arguments passed to `CREATE FUNCTION`
///
/// Note this meant to be the same as from sqlparser's
[`sqlparser::ast::Statement::CreateFunction`]
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
pub struct CreateFunction {
```
##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+ // TODO: There is open question should we expose sqlparser types or
redefine them here?
Review Comment:
Is this still TODO (or are the rest of the comments below todo)? I think
this structure makes sense and with a little more explanation on rationale (aka
the Statement::CreateFunction is a variant and not its own struct) this make
sense
The rationale I think would be important so that when we upgrade sqlparser's
version and it adds more CREATE FUNCTION variants, the updater knows the
desired results in DataFusion
##########
datafusion/sql/src/statement.rs:
##########
@@ -625,8 +626,136 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
});
Ok(LogicalPlan::Statement(statement))
}
+ Statement::CreateFunction {
+ or_replace,
+ temporary,
+ name,
+ args,
+ return_type,
+ params,
+ } => {
+ let return_type = match return_type {
+ Some(t) => Some(self.convert_data_type(&t)?),
+ None => None,
+ };
+ let mut planner_context = PlannerContext::new();
+ let empty_schema = &DFSchema::empty();
+
+ let args = match args {
+ Some(function_args) => {
+ let function_args = function_args
+ .into_iter()
+ .map(|arg| {
+ let data_type =
self.convert_data_type(&arg.data_type)?;
+
+ let default_expr = match arg.default_expr {
+ Some(expr) => Some(self.sql_to_expr(
+ expr,
+ empty_schema,
+ &mut planner_context,
+ )?),
+ None => None,
+ };
+ Ok(OperateFunctionArg {
+ name: arg.name,
+ default_expr,
+ data_type,
+ })
+ })
+ .collect::<Result<Vec<OperateFunctionArg>>>();
+ Some(function_args?)
+ }
+ None => None,
+ };
+
+ // Not sure if this is correct way to generate name
+ // postgresql function definition may have schema part as well
+ // datafusion at the moment does lookup based on given string
+ // `schema_name.function_name` will work even if there is no
`schema_name`
+ let name: String = name
+ .0
+ .into_iter()
+ .map(|i| i.value)
+ .collect::<Vec<String>>()
+ .join(".");
+
+ //
+ // convert resulting expression to data fusion expression
+ //
+ let arg_types = args.as_ref().map(|arg| {
+ arg.iter().map(|t| t.data_type.clone()).collect::<Vec<_>>()
+ });
+ let mut planner_context = PlannerContext::new()
+
.with_prepare_param_data_types(arg_types.unwrap_or_default());
+
+ let result_expression = match params.return_ {
+ Some(r) => Some(self.sql_to_expr(
+ r,
+ &DFSchema::empty(),
+ &mut planner_context,
+ )?),
+ None => None,
+ };
+
+ let params = CreateFunctionBody {
+ language: params.language,
+ behavior: params.behavior.map(|b| match b {
+ ast::FunctionBehavior::Immutable =>
Volatility::Immutable,
+ ast::FunctionBehavior::Stable => Volatility::Stable,
+ ast::FunctionBehavior::Volatile =>
Volatility::Volatile,
+ }),
+ as_: params.as_.map(|m| m.into()),
+ return_: result_expression,
+ };
- _ => not_impl_err!("Unsupported SQL statement: {sql:?}"),
+ let statement = DdlStatement::CreateFunction(CreateFunction {
+ or_replace,
+ temporary,
+ name,
+ return_type,
+ args,
+ params,
+ schema: DFSchemaRef::new(DFSchema::empty()),
+ });
+
+ Ok(LogicalPlan::Ddl(statement))
+ }
+ Statement::DropFunction {
+ if_exists,
+ func_desc,
+ ..
+ } => {
+ // Not sure if this is correct way to generate name
Review Comment:
See above -- I also recommend erroring in the case of multi part
identifiers.
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
Ok(())
}
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+ pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+ #[doc = r" Crates and registers a function from [CreateFunction]
statement"]
+ #[must_use]
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ async fn create(
+ &self,
+ _config: &SessionConfig,
+ statement: CreateFunction,
+ ) -> datafusion::error::Result<RegisterFunction> {
+ // this function is a mock for testing
+ // `CreateFunction` should be used to derive this function
+
+ let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+ let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+ let base =
+
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+ let exponent =
+
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+ let array = base
+ .iter()
+ .zip(exponent.iter())
+ .map(|(base, exponent)| match (base, exponent) {
+ (Some(base), Some(exponent)) =>
Some(base.add_wrapping(exponent)),
+ _ => None,
+ })
+ .collect::<arrow_array::Float64Array>();
+ Ok(datafusion_expr::ColumnarValue::from(
+ Arc::new(array) as arrow_array::ArrayRef
+ ))
+ });
+
+ let args = statement.args.unwrap();
+ let mock_udf = create_udf(
+ &statement.name,
+ vec![args[0].data_type.clone(), args[1].data_type.clone()],
+ Arc::new(statement.return_type.unwrap()),
+ datafusion_expr::Volatility::Immutable,
+ mock_add,
+ );
+
+ // capture expression so we can verify
+ // it has been parsed
+ *self.captured_expr.lock() = statement.params.return_;
+
+ Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+ }
+}
+
+#[tokio::test]
+async fn create_scalar_function_from_sql_statement() -> Result<()> {
+ let function_factory = Arc::new(MockFunctionFactory::default());
+ let runtime_config = RuntimeConfig::new();
+ let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
+ let session_config =
+ SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"PostgreSQL");
+
+ let state =
+ SessionState::new_with_config_rt(session_config,
Arc::new(runtime_environment))
+ .with_function_factory(function_factory.clone());
+
+ let ctx = SessionContext::new_with_state(state);
+
+ let sql = r#"
+ CREATE FUNCTION better_add(DOUBLE, DOUBLE)
+ RETURNS DOUBLE
+ RETURN $1 + $2
+ "#;
+ let _ = ctx.sql(sql).await?;
Review Comment:
Create the `better_add` function dynamically via `CREATE FUNCTION` statement
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -635,6 +640,102 @@ async fn verify_udf_return_type() -> Result<()> {
Ok(())
}
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+ pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+ #[doc = r" Crates and registers a function from [CreateFunction]
statement"]
+ #[must_use]
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ async fn create(
+ &self,
+ _config: &SessionConfig,
+ statement: CreateFunction,
+ ) -> datafusion::error::Result<RegisterFunction> {
+ // this function is a mock for testing
+ // `CreateFunction` should be used to derive this function
+
+ let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+ let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+ let base =
+
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+ let exponent =
+
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+ let array = base
+ .iter()
+ .zip(exponent.iter())
+ .map(|(base, exponent)| match (base, exponent) {
+ (Some(base), Some(exponent)) =>
Some(base.add_wrapping(exponent)),
+ _ => None,
+ })
+ .collect::<arrow_array::Float64Array>();
+ Ok(datafusion_expr::ColumnarValue::from(
+ Arc::new(array) as arrow_array::ArrayRef
+ ))
+ });
+
+ let args = statement.args.unwrap();
+ let mock_udf = create_udf(
+ &statement.name,
+ vec![args[0].data_type.clone(), args[1].data_type.clone()],
+ Arc::new(statement.return_type.unwrap()),
+ datafusion_expr::Volatility::Immutable,
+ mock_add,
+ );
+
+ // capture expression so we can verify
+ // it has been parsed
+ *self.captured_expr.lock() = statement.params.return_;
+
+ Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+ }
+}
+
+#[tokio::test]
+async fn create_scalar_function_from_sql_statement() -> Result<()> {
+ let function_factory = Arc::new(MockFunctionFactory::default());
+ let runtime_config = RuntimeConfig::new();
+ let runtime_environment = RuntimeEnv::new(runtime_config).unwrap();
+ let session_config =
+ SessionConfig::new().set_str("datafusion.sql_parser.dialect",
"PostgreSQL");
+
+ let state =
+ SessionState::new_with_config_rt(session_config,
Arc::new(runtime_environment))
+ .with_function_factory(function_factory.clone());
+
+ let ctx = SessionContext::new_with_state(state);
+
+ let sql = r#"
+ CREATE FUNCTION better_add(DOUBLE, DOUBLE)
+ RETURNS DOUBLE
+ RETURN $1 + $2
+ "#;
+ let _ = ctx.sql(sql).await?;
+
+ ctx.sql("select better_add(2.0, 2.0)").await?.show().await?;
+
+ // check if we sql expr has been converted to datafusion expr
+ let captured_expression =
function_factory.captured_expr.lock().clone().unwrap();
+
+ // is there some better way to test this
+ assert_eq!("$1 + $2", captured_expression.to_string());
+
+ // statement drops function
+ assert!(ctx.sql("drop function better_add").await.is_ok());
+ // no function, it panics
+ assert!(ctx.sql("drop function better_add").await.is_err());
+ // no function, it dies not care
+ assert!(ctx.sql("drop function if exists better_add").await.is_ok());
+ // query should fail as there is no function
+ assert!(ctx.sql("select better_add(2.0, 2.0)").await.is_err());
+
Review Comment:
Would it also be possible to add a test using
`SessionContext::sql_with_options` showing that if DDL is disabled that CREATE
/ DROP function is also disabled?
https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#method.sql_with_options
##########
datafusion/expr/src/logical_plan/ddl.rs:
##########
@@ -303,3 +321,66 @@ pub struct DropCatalogSchema {
/// Dummy schema
pub schema: DFSchemaRef,
}
+
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+ // TODO: There is open question should we expose sqlparser types or
redefine them here?
+ // At the moment it make more sense to expose sqlparser types and
leave
+ // user to convert them as needed
+ pub or_replace: bool,
+ pub temporary: bool,
+ pub name: String,
+ pub args: Option<Vec<OperateFunctionArg>>,
+ pub return_type: Option<DataType>,
+ // TODO: move this to new struct here
+ pub params: CreateFunctionBody,
+ //pub body: String,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct OperateFunctionArg {
+ // it is not really supported so no need to have it here
+ // currently
Review Comment:
I think it is fine to leave some of these fields as "TODO" items and if
someone needs them they can add them as a follow on PR
```suggestion
// TODO: figure out how to support mode
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]