alamb commented on code in PR #9333:
URL: https://github.com/apache/arrow-datafusion/pull/9333#discussion_r1506694977


##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -495,6 +496,32 @@ impl SessionContext {
                 self.set_variable(stmt).await
             }
 
+            LogicalPlan::Statement(Statement::CreateFunction(stmt)) => {

Review Comment:
   Since CreateFunction / DropFunction modify the catalog, what do you think 
about making them part of `DdlStatement` instead of a `Statement`? 



##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -495,6 +496,32 @@ impl SessionContext {
                 self.set_variable(stmt).await
             }
 
+            LogicalPlan::Statement(Statement::CreateFunction(stmt)) => {
+                let function_factory = 
self.state.read().function_factory.clone();
+
+                match function_factory {
+                    Some(f) => f.create(self.state.clone(), stmt).await?,
+                    None => Err(DataFusionError::Configuration(
+                        "Function factory has not been configured".into(),
+                    ))?,
+                };

Review Comment:
   Minor: I think it would be better to follow the model of `create_view` etc 
and make this code a function on `SessionContext` (so that this code just 
dispatches to that method)
   
   The upside of doing so is that it follows the existing code pattern and it 
offers a place to document `create_function` and `drop_function` publically in 
in more detail



##########
datafusion/core/tests/sql/sql_api.rs:
##########
@@ -39,6 +52,99 @@ async fn unsupported_ddl_returns_error() {
     ctx.sql_with_options(sql, options).await.unwrap();
 }
 
+struct MockFunctionFactory;
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+    #[doc = r" Crates and registers a function from [CreateFunction] 
statement"]
+    #[must_use]
+    #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+    async fn create(
+        &self,
+        state: Arc<RwLock<SessionState>>,
+        statement: CreateFunction,
+    ) -> datafusion::error::Result<()> {
+        // this function is a mock for testing
+        // `CreateFunction` should be used to derive this function
+
+        let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+            let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+            let base =
+                
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+            let exponent =
+                
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+            let array = base
+                .iter()
+                .zip(exponent.iter())
+                .map(|(base, exponent)| match (base, exponent) {
+                    (Some(base), Some(exponent)) => 
Some(base.add_wrapping(exponent)),
+                    _ => None,
+                })
+                .collect::<arrow_array::Float64Array>();
+            Ok(datafusion_expr::ColumnarValue::from(
+                Arc::new(array) as arrow_array::ArrayRef
+            ))
+        });
+
+        let args = statement.args.unwrap();
+        let mock_udf = create_udf(
+            &statement.name,
+            vec![args[0].data_type.clone(), args[1].data_type.clone()],
+            Arc::new(statement.return_type.unwrap()),
+            datafusion_expr::Volatility::Immutable,
+            mock_add,
+        );
+
+        // we may need other infrastructure provided by state, for example:
+        // state.config().get_extension()

Review Comment:
   🤔  maybe we should pass the config into `create` rather than `SessionContext`



##########
datafusion/expr/src/logical_plan/statement.rs:
##########
@@ -148,3 +164,32 @@ pub struct SetVariable {
     /// Dummy schema
     pub schema: DFSchemaRef,
 }
+#[derive(Clone, PartialEq, Eq, Hash, Debug)]
+pub struct CreateFunction {
+    // TODO: There is open question should we expose sqlparser types or 
redefine them here?

Review Comment:
   I think that this approach (redefine the types) is more consistent with the 
rest of the DataFusion codebase, though it is an unfortunate amount of 
replication



##########
datafusion/core/tests/sql/sql_api.rs:
##########
@@ -39,6 +52,99 @@ async fn unsupported_ddl_returns_error() {
     ctx.sql_with_options(sql, options).await.unwrap();
 }
 
+struct MockFunctionFactory;

Review Comment:
   This is very cool -- I think it would be great (as a follow on PR) if we 
turned this into an example (with docs, etc like the others in 
https://github.com/apache/arrow-datafusion/tree/main/datafusion-examples/examples)



##########
datafusion/core/tests/sql/sql_api.rs:
##########
@@ -39,6 +52,99 @@ async fn unsupported_ddl_returns_error() {
     ctx.sql_with_options(sql, options).await.unwrap();
 }
 
+struct MockFunctionFactory;
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+    #[doc = r" Crates and registers a function from [CreateFunction] 
statement"]
+    #[must_use]
+    #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+    async fn create(
+        &self,
+        state: Arc<RwLock<SessionState>>,
+        statement: CreateFunction,
+    ) -> datafusion::error::Result<()> {
+        // this function is a mock for testing
+        // `CreateFunction` should be used to derive this function
+
+        let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+            let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+            let base =
+                
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+            let exponent =
+                
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+            let array = base
+                .iter()
+                .zip(exponent.iter())
+                .map(|(base, exponent)| match (base, exponent) {
+                    (Some(base), Some(exponent)) => 
Some(base.add_wrapping(exponent)),
+                    _ => None,
+                })
+                .collect::<arrow_array::Float64Array>();
+            Ok(datafusion_expr::ColumnarValue::from(
+                Arc::new(array) as arrow_array::ArrayRef
+            ))
+        });
+
+        let args = statement.args.unwrap();
+        let mock_udf = create_udf(
+            &statement.name,
+            vec![args[0].data_type.clone(), args[1].data_type.clone()],
+            Arc::new(statement.return_type.unwrap()),
+            datafusion_expr::Volatility::Immutable,
+            mock_add,
+        );
+
+        // we may need other infrastructure provided by state, for example:
+        // state.config().get_extension()
+
+        // register mock udf for testing
+        state.write().register_udf(mock_udf.into())?;
+        Ok(())
+    }
+
+    async fn remove(
+        &self,
+        _state: Arc<RwLock<SessionState>>,
+        _statement: DropFunction,
+    ) -> datafusion::error::Result<()> {
+        // at the moment state does not support unregister
+        // ignoring for now
+        Ok(())
+    }
+}
+
+#[tokio::test]
+async fn create_user_defined_function_statement() {

Review Comment:
   Perhaps we could put this test along side the other user defined tests in 
https://github.com/apache/arrow-datafusion/tree/main/datafusion/core/tests/user_defined
 so it was easier to find



##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -1246,7 +1273,42 @@ impl QueryPlanner for DefaultQueryPlanner {
             .await
     }
 }
+/// Crates and registers a function from [CreateFunction] statement
+///
+/// It is intended to handle `CREATE FUNCTION` statements
+/// and interact with [SessionState] to registers new udfs.
+///
+/// Datafusion `SQL` dialect does not support `CREATE FUNCTION`
+/// in generic dialect, so dialect should be changed to `PostgreSQL`
+///
+/// ```rust, no_run
+/// # use datafusion::execution::config::SessionConfig;
+/// SessionConfig::new().set_str("datafusion.sql_parser.dialect", 
"PostgreSQL");
+/// ```
+#[async_trait]
+pub trait FunctionFactory: Sync + Send {
+    // TODO: I don't like having RwLock Leaking here, who ever implements it
+    //       has to depend ot `parking_lot`. I'f we expose &mut SessionState it
+    //       may keep lock of too long.
+    //
+    //       Not sure if there is better approach.
+    //
+
+    /// Handles creation of user defined function specified in 
[CreateFunction] statement
+    async fn create(
+        &self,
+        state: Arc<RwLock<SessionState>>,

Review Comment:
   It seems like `create` here handles two things:
   1. Creation of the function
   2. Registering the function with the SessionState
   
   What if we made `create` only responsible for creating a function and made 
the caller responsible for registering it? I think that would make the API 
simpler / not involve  RW lock
   
   For example, something like
   ```rust
   /// A function defined by a FunctionFactory
   enum NewFunction  {
     Scalar(ScalarUDF),
     Aggregate(AggregateUDF),
     Window(WindowUDF)
   }
   
   pub trait FunctionFactory: Sync + Send {
   ...
   async fn create(
           &self,
           statement: CreateFunction,
   ) -> Result<NewFunction>;
   ...
   }
   ```
   
   
   
   
   Another alternative might be rather than passing the `SessionState`, maybe 
we could pass a reference to `&mut dyn FunctionRegistry` which would also avoid 
the RwLock 🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to