alamb commented on code in PR #9333:
URL: https://github.com/apache/arrow-datafusion/pull/9333#discussion_r1508163312
##########
datafusion/core/tests/user_defined/user_defined_scalar_functions.rs:
##########
@@ -619,6 +624,117 @@ async fn verify_udf_return_type() -> Result<()> {
Ok(())
}
+#[derive(Debug, Default)]
+struct MockFunctionFactory {
+ pub captured_expr: Mutex<Option<Expr>>,
+}
+
+#[async_trait::async_trait]
+impl FunctionFactory for MockFunctionFactory {
+ #[doc = r" Crates and registers a function from [CreateFunction]
statement"]
+ #[must_use]
+ #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
+ async fn create(
+ &self,
+ _config: &SessionConfig,
+ statement: CreateFunction,
+ ) -> datafusion::error::Result<RegisterFunction> {
+ // this function is a mock for testing
+ // `CreateFunction` should be used to derive this function
+
+ let mock_add = Arc::new(|args: &[datafusion_expr::ColumnarValue]| {
+ let args = datafusion_expr::ColumnarValue::values_to_arrays(args)?;
+ let base =
+
datafusion_common::cast::as_float64_array(&args[0]).expect("cast failed");
+ let exponent =
+
datafusion_common::cast::as_float64_array(&args[1]).expect("cast failed");
+
+ let array = base
+ .iter()
+ .zip(exponent.iter())
+ .map(|(base, exponent)| match (base, exponent) {
+ (Some(base), Some(exponent)) =>
Some(base.add_wrapping(exponent)),
+ _ => None,
+ })
+ .collect::<arrow_array::Float64Array>();
+ Ok(datafusion_expr::ColumnarValue::from(
+ Arc::new(array) as arrow_array::ArrayRef
+ ))
+ });
+
+ let args = statement.args.unwrap();
+ let mock_udf = create_udf(
+ &statement.name,
+ vec![args[0].data_type.clone(), args[1].data_type.clone()],
+ Arc::new(statement.return_type.unwrap()),
+ datafusion_expr::Volatility::Immutable,
+ mock_add,
+ );
+
+ // capture expression so we can verify
+ // it has been parsed
+ *self.captured_expr.lock() = statement.params.return_;
+
+ Ok(RegisterFunction::Scalar(Arc::new(mock_udf)))
+ }
+
+ async fn remove(
Review Comment:
Thinking about it more, maybe we don't even need a `remove` in the
`FunctionFactory` at all -- dropping the function is mostly handled by
`deregister_udf`?
Or maybe there is some sort of deiniitialization of the function that is
needed that would be `FunctionFactory` specific🤔. However, perhaps UDFs that
needed that could implement some sort of deregistering / deallocation on `drop`.
Handling deregistering on `drop` might be better anyways as would prevents a
function from being deregistered out from under a running execution...
##########
datafusion/core/src/execution/context/mod.rs:
##########
@@ -794,6 +797,52 @@ impl SessionContext {
Ok(false)
}
+ async fn create_function(&self, stmt: CreateFunction) -> Result<DataFrame>
{
+ let function = {
+ let state = self.state.read().clone();
+ let function_factory = &state.function_factory;
+
+ match function_factory {
+ Some(f) => f.create(state.config(), stmt).await?,
+ _ => Err(DataFusionError::Configuration(
+ "Function factory has not been configured".into(),
+ ))?,
+ }
+ };
+
+ match function {
+ RegisterFunction::Scalar(f) => {
+ self.state.write().register_udf(f)?;
+ }
+ RegisterFunction::Aggregate(f) => {
+ self.state.write().register_udaf(f)?;
+ }
+ RegisterFunction::Window(f) => {
+ self.state.write().register_udwf(f)?;
+ }
+ RegisterFunction::Table(name, f) => self.register_udtf(&name, f),
+ };
+
+ self.return_empty_dataframe()
+ }
+
+ async fn drop_function(&self, stmt: DropFunction) -> Result<DataFrame> {
+ let _function = {
+ let state = self.state.read().clone();
+ let function_factory = &state.function_factory;
+
+ match function_factory {
+ Some(f) => f.remove(state.config(), stmt).await?,
+ None => Err(DataFusionError::Configuration(
+ "Function factory has not been configured".into(),
+ ))?,
+ }
+ };
+
+ // TODO: Once we have unregister UDF we need to implement it here
Review Comment:
I think we do have them (added relatively recently):
https://github.com/apache/arrow-datafusion/blob/ea30b93194cfdfe4148a45fc0e33549884ba81b1/datafusion/core/src/execution/context/mod.rs#L853-L865
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]