This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fe3b101f54 Allow type coersion of zero input arrays to nullary (#15487)
fe3b101f54 is described below
commit fe3b101f549f3db14e44eee71e609ef393f86b33
Author: Tim Saucer <[email protected]>
AuthorDate: Mon Mar 31 10:01:23 2025 -0400
Allow type coersion of zero input arrays to nullary (#15487)
* ScalarUDF via FFI would break with nullary type inputs. We could not
coerce empty vector of arguments to nullary due to check. This also exposes
return_type_from_args instead of just return_type that was causing some UDFs to
fail. Added unit tests and moved around FFI test modules a little.
* Add license text
* Correct error in documentation
* Error message changed in test due to updated scalar coercion
* Perform check of user defined types when looking for empty argument types
* Updated error messages during unit test
---
datafusion/expr/src/type_coercion/functions.rs | 8 +-
datafusion/ffi/src/tests/mod.rs | 6 +-
datafusion/ffi/src/tests/udf_udaf_udwf.rs | 11 +-
datafusion/ffi/src/tests/utils.rs | 87 ++++++++++++++
datafusion/ffi/src/{udf.rs => udf/mod.rs} | 47 +++++++-
datafusion/ffi/src/udf/return_info.rs | 53 +++++++++
datafusion/ffi/src/udf/return_type_args.rs | 142 +++++++++++++++++++++++
datafusion/ffi/tests/ffi_integration.rs | 116 +-----------------
datafusion/ffi/tests/ffi_udf.rs | 104 +++++++++++++++++
datafusion/sql/tests/sql_integration.rs | 4 +-
datafusion/sqllogictest/test_files/functions.slt | 4 +-
11 files changed, 458 insertions(+), 124 deletions(-)
diff --git a/datafusion/expr/src/type_coercion/functions.rs
b/datafusion/expr/src/type_coercion/functions.rs
index 0ec017bdc2..3b34718062 100644
--- a/datafusion/expr/src/type_coercion/functions.rs
+++ b/datafusion/expr/src/type_coercion/functions.rs
@@ -49,7 +49,7 @@ pub fn data_types_with_scalar_udf(
let signature = func.signature();
let type_signature = &signature.type_signature;
- if current_types.is_empty() {
+ if current_types.is_empty() && type_signature !=
&TypeSignature::UserDefined {
if type_signature.supports_zero_argument() {
return Ok(vec![]);
} else if type_signature.used_to_support_zero_arguments() {
@@ -87,7 +87,7 @@ pub fn data_types_with_aggregate_udf(
let signature = func.signature();
let type_signature = &signature.type_signature;
- if current_types.is_empty() {
+ if current_types.is_empty() && type_signature !=
&TypeSignature::UserDefined {
if type_signature.supports_zero_argument() {
return Ok(vec![]);
} else if type_signature.used_to_support_zero_arguments() {
@@ -124,7 +124,7 @@ pub fn data_types_with_window_udf(
let signature = func.signature();
let type_signature = &signature.type_signature;
- if current_types.is_empty() {
+ if current_types.is_empty() && type_signature !=
&TypeSignature::UserDefined {
if type_signature.supports_zero_argument() {
return Ok(vec![]);
} else if type_signature.used_to_support_zero_arguments() {
@@ -161,7 +161,7 @@ pub fn data_types(
) -> Result<Vec<DataType>> {
let type_signature = &signature.type_signature;
- if current_types.is_empty() {
+ if current_types.is_empty() && type_signature !=
&TypeSignature::UserDefined {
if type_signature.supports_zero_argument() {
return Ok(vec![]);
} else if type_signature.used_to_support_zero_arguments() {
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 4b4a29276d..c7a9816431 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -37,12 +37,13 @@ use datafusion::{
common::record_batch,
};
use sync_provider::create_sync_table_provider;
-use udf_udaf_udwf::create_ffi_abs_func;
+use udf_udaf_udwf::{create_ffi_abs_func, create_ffi_random_func};
mod async_provider;
pub mod catalog;
mod sync_provider;
mod udf_udaf_udwf;
+pub mod utils;
#[repr(C)]
#[derive(StableAbi)]
@@ -60,6 +61,8 @@ pub struct ForeignLibraryModule {
/// Create a scalar UDF
pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
+ pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
+
pub version: extern "C" fn() -> u64,
}
@@ -105,6 +108,7 @@ pub fn get_foreign_library_module() ->
ForeignLibraryModuleRef {
create_catalog: create_catalog_provider,
create_table: construct_table_provider,
create_scalar_udf: create_ffi_abs_func,
+ create_nullary_udf: create_ffi_random_func,
version: super::version,
}
.leak_into_prefix()
diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
index e8a13aac13..b40bec762b 100644
--- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
+++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
@@ -16,7 +16,10 @@
// under the License.
use crate::udf::FFI_ScalarUDF;
-use datafusion::{functions::math::abs::AbsFunc, logical_expr::ScalarUDF};
+use datafusion::{
+ functions::math::{abs::AbsFunc, random::RandomFunc},
+ logical_expr::ScalarUDF,
+};
use std::sync::Arc;
@@ -25,3 +28,9 @@ pub(crate) extern "C" fn create_ffi_abs_func() ->
FFI_ScalarUDF {
udf.into()
}
+
+pub(crate) extern "C" fn create_ffi_random_func() -> FFI_ScalarUDF {
+ let udf: Arc<ScalarUDF> = Arc::new(RandomFunc::new().into());
+
+ udf.into()
+}
diff --git a/datafusion/ffi/src/tests/utils.rs
b/datafusion/ffi/src/tests/utils.rs
new file mode 100644
index 0000000000..6465b17d9b
--- /dev/null
+++ b/datafusion/ffi/src/tests/utils.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::tests::ForeignLibraryModuleRef;
+use abi_stable::library::RootModule;
+use datafusion::error::{DataFusionError, Result};
+use std::path::Path;
+
+/// Compute the path to the library. It would be preferable to simply use
+/// abi_stable::library::development_utils::compute_library_path however
+/// our current CI pipeline has a `ci` profile that we need to use to
+/// find the library.
+pub fn compute_library_path<M: RootModule>(
+ target_path: &Path,
+) -> std::io::Result<std::path::PathBuf> {
+ let debug_dir = target_path.join("debug");
+ let release_dir = target_path.join("release");
+ let ci_dir = target_path.join("ci");
+
+ let debug_path = M::get_library_path(&debug_dir.join("deps"));
+ let release_path = M::get_library_path(&release_dir.join("deps"));
+ let ci_path = M::get_library_path(&ci_dir.join("deps"));
+
+ let all_paths = vec![
+ (debug_dir.clone(), debug_path),
+ (release_dir, release_path),
+ (ci_dir, ci_path),
+ ];
+
+ let best_path = all_paths
+ .into_iter()
+ .filter(|(_, path)| path.exists())
+ .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok())
+ .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok())
+ .max_by_key(|(_, date)| *date)
+ .map(|(dir, _)| dir)
+ .unwrap_or(debug_dir);
+
+ Ok(best_path)
+}
+
+pub fn get_module() -> Result<ForeignLibraryModuleRef> {
+ let expected_version = crate::version();
+
+ let crate_root = Path::new(env!("CARGO_MANIFEST_DIR"));
+ let target_dir = crate_root
+ .parent()
+ .expect("Failed to find crate parent")
+ .parent()
+ .expect("Failed to find workspace root")
+ .join("target");
+
+ // Find the location of the library. This is specific to the build
environment,
+ // so you will need to change the approach here based on your use case.
+ // let target: &std::path::Path = "../../../../target/".as_ref();
+ let library_path =
+ compute_library_path::<ForeignLibraryModuleRef>(target_dir.as_path())
+ .map_err(|e| DataFusionError::External(Box::new(e)))?
+ .join("deps");
+
+ // Load the module
+ let module = ForeignLibraryModuleRef::load_from_directory(&library_path)
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+ assert_eq!(
+ module
+ .version()
+ .expect("Unable to call version on FFI module")(),
+ expected_version
+ );
+
+ Ok(module)
+}
diff --git a/datafusion/ffi/src/udf.rs b/datafusion/ffi/src/udf/mod.rs
similarity index 87%
rename from datafusion/ffi/src/udf.rs
rename to datafusion/ffi/src/udf/mod.rs
index bbc9cf936c..706b9fabed 100644
--- a/datafusion/ffi/src/udf.rs
+++ b/datafusion/ffi/src/udf/mod.rs
@@ -29,7 +29,9 @@ use arrow::{
};
use datafusion::{
error::DataFusionError,
- logical_expr::type_coercion::functions::data_types_with_scalar_udf,
+ logical_expr::{
+ type_coercion::functions::data_types_with_scalar_udf, ReturnInfo,
ReturnTypeArgs,
+ },
};
use datafusion::{
error::Result,
@@ -37,6 +39,10 @@ use datafusion::{
ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
},
};
+use return_info::FFI_ReturnInfo;
+use return_type_args::{
+ FFI_ReturnTypeArgs, ForeignReturnTypeArgs, ForeignReturnTypeArgsOwned,
+};
use crate::{
arrow_wrappers::{WrappedArray, WrappedSchema},
@@ -45,6 +51,9 @@ use crate::{
volatility::FFI_Volatility,
};
+pub mod return_info;
+pub mod return_type_args;
+
/// A stable struct for sharing a [`ScalarUDF`] across FFI boundaries.
#[repr(C)]
#[derive(Debug, StableAbi)]
@@ -66,6 +75,14 @@ pub struct FFI_ScalarUDF {
arg_types: RVec<WrappedSchema>,
) -> RResult<WrappedSchema, RString>,
+ /// Determines the return info of the underlying [`ScalarUDF`]. Either this
+ /// or return_type may be implemented on a UDF.
+ pub return_type_from_args: unsafe extern "C" fn(
+ udf: &Self,
+ args: FFI_ReturnTypeArgs,
+ )
+ -> RResult<FFI_ReturnInfo, RString>,
+
/// Execute the underlying [`ScalarUDF`] and return the result as a
`FFI_ArrowArray`
/// within an AbiStable wrapper.
pub invoke_with_args: unsafe extern "C" fn(
@@ -123,6 +140,23 @@ unsafe extern "C" fn return_type_fn_wrapper(
rresult!(return_type)
}
+unsafe extern "C" fn return_type_from_args_fn_wrapper(
+ udf: &FFI_ScalarUDF,
+ args: FFI_ReturnTypeArgs,
+) -> RResult<FFI_ReturnInfo, RString> {
+ let private_data = udf.private_data as *const ScalarUDFPrivateData;
+ let udf = &(*private_data).udf;
+
+ let args: ForeignReturnTypeArgsOwned = rresult_return!((&args).try_into());
+ let args_ref: ForeignReturnTypeArgs = (&args).into();
+
+ let return_type = udf
+ .return_type_from_args((&args_ref).into())
+ .and_then(FFI_ReturnInfo::try_from);
+
+ rresult!(return_type)
+}
+
unsafe extern "C" fn coerce_types_fn_wrapper(
udf: &FFI_ScalarUDF,
arg_types: RVec<WrappedSchema>,
@@ -209,6 +243,7 @@ impl From<Arc<ScalarUDF>> for FFI_ScalarUDF {
short_circuits,
invoke_with_args: invoke_with_args_fn_wrapper,
return_type: return_type_fn_wrapper,
+ return_type_from_args: return_type_from_args_fn_wrapper,
coerce_types: coerce_types_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
@@ -281,6 +316,16 @@ impl ScalarUDFImpl for ForeignScalarUDF {
result.and_then(|r| (&r.0).try_into().map_err(DataFusionError::from))
}
+ fn return_type_from_args(&self, args: ReturnTypeArgs) ->
Result<ReturnInfo> {
+ let args: FFI_ReturnTypeArgs = args.try_into()?;
+
+ let result = unsafe { (self.udf.return_type_from_args)(&self.udf,
args) };
+
+ let result = df_result!(result);
+
+ result.and_then(|r| r.try_into())
+ }
+
fn invoke_with_args(&self, invoke_args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
let ScalarFunctionArgs {
args,
diff --git a/datafusion/ffi/src/udf/return_info.rs
b/datafusion/ffi/src/udf/return_info.rs
new file mode 100644
index 0000000000..cf76ddd1db
--- /dev/null
+++ b/datafusion/ffi/src/udf/return_info.rs
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use abi_stable::StableAbi;
+use arrow::{datatypes::DataType, ffi::FFI_ArrowSchema};
+use datafusion::{error::DataFusionError, logical_expr::ReturnInfo};
+
+use crate::arrow_wrappers::WrappedSchema;
+
+/// A stable struct for sharing a [`ReturnInfo`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_ReturnInfo {
+ return_type: WrappedSchema,
+ nullable: bool,
+}
+
+impl TryFrom<ReturnInfo> for FFI_ReturnInfo {
+ type Error = DataFusionError;
+
+ fn try_from(value: ReturnInfo) -> Result<Self, Self::Error> {
+ let return_type =
WrappedSchema(FFI_ArrowSchema::try_from(value.return_type())?);
+ Ok(Self {
+ return_type,
+ nullable: value.nullable(),
+ })
+ }
+}
+
+impl TryFrom<FFI_ReturnInfo> for ReturnInfo {
+ type Error = DataFusionError;
+
+ fn try_from(value: FFI_ReturnInfo) -> Result<Self, Self::Error> {
+ let return_type = DataType::try_from(&value.return_type.0)?;
+
+ Ok(ReturnInfo::new(return_type, value.nullable))
+ }
+}
diff --git a/datafusion/ffi/src/udf/return_type_args.rs
b/datafusion/ffi/src/udf/return_type_args.rs
new file mode 100644
index 0000000000..a0897630e2
--- /dev/null
+++ b/datafusion/ffi/src/udf/return_type_args.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use abi_stable::{
+ std_types::{ROption, RVec},
+ StableAbi,
+};
+use arrow::datatypes::DataType;
+use datafusion::{
+ common::exec_datafusion_err, error::DataFusionError,
logical_expr::ReturnTypeArgs,
+ scalar::ScalarValue,
+};
+
+use crate::{
+ arrow_wrappers::WrappedSchema,
+ util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped},
+};
+use prost::Message;
+
+/// A stable struct for sharing a [`ReturnTypeArgs`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_ReturnTypeArgs {
+ arg_types: RVec<WrappedSchema>,
+ scalar_arguments: RVec<ROption<RVec<u8>>>,
+ nullables: RVec<bool>,
+}
+
+impl TryFrom<ReturnTypeArgs<'_>> for FFI_ReturnTypeArgs {
+ type Error = DataFusionError;
+
+ fn try_from(value: ReturnTypeArgs) -> Result<Self, Self::Error> {
+ let arg_types = vec_datatype_to_rvec_wrapped(value.arg_types)?;
+ let scalar_arguments: Result<Vec<_>, Self::Error> = value
+ .scalar_arguments
+ .iter()
+ .map(|maybe_arg| {
+ maybe_arg
+ .map(|arg| {
+ let proto_value:
datafusion_proto::protobuf::ScalarValue =
+ arg.try_into()?;
+ let proto_bytes: RVec<u8> =
proto_value.encode_to_vec().into();
+ Ok(proto_bytes)
+ })
+ .transpose()
+ })
+ .collect();
+ let scalar_arguments =
scalar_arguments?.into_iter().map(ROption::from).collect();
+
+ let nullables = value.nullables.into();
+ Ok(Self {
+ arg_types,
+ scalar_arguments,
+ nullables,
+ })
+ }
+}
+
+// TODO(tsaucer) It would be good to find a better way around this, but it
+// appears a restriction based on the need to have a borrowed ScalarValue
+// in the arguments when converted to ReturnTypeArgs
+pub struct ForeignReturnTypeArgsOwned {
+ arg_types: Vec<DataType>,
+ scalar_arguments: Vec<Option<ScalarValue>>,
+ nullables: Vec<bool>,
+}
+
+pub struct ForeignReturnTypeArgs<'a> {
+ arg_types: &'a [DataType],
+ scalar_arguments: Vec<Option<&'a ScalarValue>>,
+ nullables: &'a [bool],
+}
+
+impl TryFrom<&FFI_ReturnTypeArgs> for ForeignReturnTypeArgsOwned {
+ type Error = DataFusionError;
+
+ fn try_from(value: &FFI_ReturnTypeArgs) -> Result<Self, Self::Error> {
+ let arg_types = rvec_wrapped_to_vec_datatype(&value.arg_types)?;
+ let scalar_arguments: Result<Vec<_>, Self::Error> = value
+ .scalar_arguments
+ .iter()
+ .map(|maybe_arg| {
+ let maybe_arg = maybe_arg.as_ref().map(|arg| {
+ let proto_value =
+
datafusion_proto::protobuf::ScalarValue::decode(arg.as_ref())
+ .map_err(|err| exec_datafusion_err!("{}", err))?;
+ let scalar_value: ScalarValue = (&proto_value).try_into()?;
+ Ok(scalar_value)
+ });
+ Option::from(maybe_arg).transpose()
+ })
+ .collect();
+ let scalar_arguments = scalar_arguments?.into_iter().collect();
+
+ let nullables = value.nullables.iter().cloned().collect();
+
+ Ok(Self {
+ arg_types,
+ scalar_arguments,
+ nullables,
+ })
+ }
+}
+
+impl<'a> From<&'a ForeignReturnTypeArgsOwned> for ForeignReturnTypeArgs<'a> {
+ fn from(value: &'a ForeignReturnTypeArgsOwned) -> Self {
+ Self {
+ arg_types: &value.arg_types,
+ scalar_arguments: value
+ .scalar_arguments
+ .iter()
+ .map(|opt| opt.as_ref())
+ .collect(),
+ nullables: &value.nullables,
+ }
+ }
+}
+
+impl<'a> From<&'a ForeignReturnTypeArgs<'a>> for ReturnTypeArgs<'a> {
+ fn from(value: &'a ForeignReturnTypeArgs) -> Self {
+ ReturnTypeArgs {
+ arg_types: value.arg_types,
+ scalar_arguments: &value.scalar_arguments,
+ nullables: value.nullables,
+ }
+ }
+}
diff --git a/datafusion/ffi/tests/ffi_integration.rs
b/datafusion/ffi/tests/ffi_integration.rs
index f610f12c82..c6df324e9a 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -20,84 +20,14 @@
#[cfg(feature = "integration-tests")]
mod tests {
- use abi_stable::library::RootModule;
- use datafusion::common::record_batch;
use datafusion::error::{DataFusionError, Result};
- use datafusion::logical_expr::ScalarUDF;
- use datafusion::prelude::{col, SessionContext};
+ use datafusion::prelude::SessionContext;
use datafusion_ffi::catalog_provider::ForeignCatalogProvider;
use datafusion_ffi::table_provider::ForeignTableProvider;
- use datafusion_ffi::tests::{create_record_batch, ForeignLibraryModuleRef};
- use datafusion_ffi::udf::ForeignScalarUDF;
- use std::path::Path;
+ use datafusion_ffi::tests::create_record_batch;
+ use datafusion_ffi::tests::utils::get_module;
use std::sync::Arc;
- /// Compute the path to the library. It would be preferable to simply use
- /// abi_stable::library::development_utils::compute_library_path however
- /// our current CI pipeline has a `ci` profile that we need to use to
- /// find the library.
- pub fn compute_library_path<M: RootModule>(
- target_path: &Path,
- ) -> std::io::Result<std::path::PathBuf> {
- let debug_dir = target_path.join("debug");
- let release_dir = target_path.join("release");
- let ci_dir = target_path.join("ci");
-
- let debug_path = M::get_library_path(&debug_dir.join("deps"));
- let release_path = M::get_library_path(&release_dir.join("deps"));
- let ci_path = M::get_library_path(&ci_dir.join("deps"));
-
- let all_paths = vec![
- (debug_dir.clone(), debug_path),
- (release_dir, release_path),
- (ci_dir, ci_path),
- ];
-
- let best_path = all_paths
- .into_iter()
- .filter(|(_, path)| path.exists())
- .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok())
- .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok())
- .max_by_key(|(_, date)| *date)
- .map(|(dir, _)| dir)
- .unwrap_or(debug_dir);
-
- Ok(best_path)
- }
-
- fn get_module() -> Result<ForeignLibraryModuleRef> {
- let expected_version = datafusion_ffi::version();
-
- let crate_root = Path::new(env!("CARGO_MANIFEST_DIR"));
- let target_dir = crate_root
- .parent()
- .expect("Failed to find crate parent")
- .parent()
- .expect("Failed to find workspace root")
- .join("target");
-
- // Find the location of the library. This is specific to the build
environment,
- // so you will need to change the approach here based on your use case.
- // let target: &std::path::Path = "../../../../target/".as_ref();
- let library_path =
-
compute_library_path::<ForeignLibraryModuleRef>(target_dir.as_path())
- .map_err(|e| DataFusionError::External(Box::new(e)))?
- .join("deps");
-
- // Load the module
- let module =
ForeignLibraryModuleRef::load_from_directory(&library_path)
- .map_err(|e| DataFusionError::External(Box::new(e)))?;
-
- assert_eq!(
- module
- .version()
- .expect("Unable to call version on FFI module")(),
- expected_version
- );
-
- Ok(module)
- }
-
/// It is important that this test is in the `tests` directory and not in
the
/// library directory so we can verify we are building a dynamic library
and
/// testing it via a different executable.
@@ -141,46 +71,6 @@ mod tests {
test_table_provider(true).await
}
- /// This test validates that we can load an external module and use a
scalar
- /// udf defined in it via the foreign function interface. In this case we
are
- /// using the abs() function as our scalar UDF.
- #[tokio::test]
- async fn test_scalar_udf() -> Result<()> {
- let module = get_module()?;
-
- let ffi_abs_func =
- module
- .create_scalar_udf()
- .ok_or(DataFusionError::NotImplemented(
- "External table provider failed to implement
create_scalar_udf"
- .to_string(),
- ))?();
- let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?;
-
- let udf: ScalarUDF = foreign_abs_func.into();
-
- let ctx = SessionContext::default();
- let df = ctx.read_batch(create_record_batch(-5, 5))?;
-
- let df = df
- .with_column("abs_a", udf.call(vec![col("a")]))?
- .with_column("abs_b", udf.call(vec![col("b")]))?;
-
- let result = df.collect().await?;
-
- let expected = record_batch!(
- ("a", Int32, vec![-5, -4, -3, -2, -1]),
- ("b", Float64, vec![-5., -4., -3., -2., -1.]),
- ("abs_a", Int32, vec![5, 4, 3, 2, 1]),
- ("abs_b", Float64, vec![5., 4., 3., 2., 1.])
- )?;
-
- assert!(result.len() == 1);
- assert!(result[0] == expected);
-
- Ok(())
- }
-
#[tokio::test]
async fn test_catalog() -> Result<()> {
let module = get_module()?;
diff --git a/datafusion/ffi/tests/ffi_udf.rs b/datafusion/ffi/tests/ffi_udf.rs
new file mode 100644
index 0000000000..bbc23552de
--- /dev/null
+++ b/datafusion/ffi/tests/ffi_udf.rs
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Add an additional module here for convenience to scope this to only
+/// when the feature integtation-tests is built
+#[cfg(feature = "integration-tests")]
+mod tests {
+
+ use arrow::datatypes::DataType;
+ use datafusion::common::record_batch;
+ use datafusion::error::{DataFusionError, Result};
+ use datafusion::logical_expr::ScalarUDF;
+ use datafusion::prelude::{col, SessionContext};
+
+ use datafusion_ffi::tests::create_record_batch;
+ use datafusion_ffi::tests::utils::get_module;
+ use datafusion_ffi::udf::ForeignScalarUDF;
+
+ /// This test validates that we can load an external module and use a
scalar
+ /// udf defined in it via the foreign function interface. In this case we
are
+ /// using the abs() function as our scalar UDF.
+ #[tokio::test]
+ async fn test_scalar_udf() -> Result<()> {
+ let module = get_module()?;
+
+ let ffi_abs_func =
+ module
+ .create_scalar_udf()
+ .ok_or(DataFusionError::NotImplemented(
+ "External table provider failed to implement
create_scalar_udf"
+ .to_string(),
+ ))?();
+ let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?;
+
+ let udf: ScalarUDF = foreign_abs_func.into();
+
+ let ctx = SessionContext::default();
+ let df = ctx.read_batch(create_record_batch(-5, 5))?;
+
+ let df = df
+ .with_column("abs_a", udf.call(vec![col("a")]))?
+ .with_column("abs_b", udf.call(vec![col("b")]))?;
+
+ let result = df.collect().await?;
+
+ let expected = record_batch!(
+ ("a", Int32, vec![-5, -4, -3, -2, -1]),
+ ("b", Float64, vec![-5., -4., -3., -2., -1.]),
+ ("abs_a", Int32, vec![5, 4, 3, 2, 1]),
+ ("abs_b", Float64, vec![5., 4., 3., 2., 1.])
+ )?;
+
+ assert!(result.len() == 1);
+ assert!(result[0] == expected);
+
+ Ok(())
+ }
+
+ /// This test validates nullary input UDFs
+ #[tokio::test]
+ async fn test_nullary_scalar_udf() -> Result<()> {
+ let module = get_module()?;
+
+ let ffi_abs_func =
+ module
+ .create_nullary_udf()
+ .ok_or(DataFusionError::NotImplemented(
+ "External table provider failed to implement
create_scalar_udf"
+ .to_string(),
+ ))?();
+ let foreign_abs_func: ForeignScalarUDF = (&ffi_abs_func).try_into()?;
+
+ let udf: ScalarUDF = foreign_abs_func.into();
+
+ let ctx = SessionContext::default();
+ let df = ctx.read_batch(create_record_batch(-5, 5))?;
+
+ let df = df.with_column("time_now", udf.call(vec![]))?;
+
+ let result = df.collect().await?;
+
+ assert!(result.len() == 1);
+ assert_eq!(
+ result[0].column_by_name("time_now").unwrap().data_type(),
+ &DataType::Float64
+ );
+
+ Ok(())
+ }
+}
diff --git a/datafusion/sql/tests/sql_integration.rs
b/datafusion/sql/tests/sql_integration.rs
index 6fd2f76973..10e5b3b1f1 100644
--- a/datafusion/sql/tests/sql_integration.rs
+++ b/datafusion/sql/tests/sql_integration.rs
@@ -4263,7 +4263,7 @@ fn test_error_message_invalid_scalar_function_signature()
{
fn test_error_message_invalid_aggregate_function_signature() {
error_message_test(
"select sum()",
- "Error during planning: 'sum' does not support zero arguments",
+ "Error during planning: Execution error: Function 'sum' user-defined
coercion failed with \"Execution error: sum function requires 1 argument, got
0\"",
);
// We keep two different prefixes because they clarify each other.
// It might be incorrect, and we should consider keeping only one.
@@ -4285,7 +4285,7 @@ fn test_error_message_invalid_window_function_signature()
{
fn test_error_message_invalid_window_aggregate_function_signature() {
error_message_test(
"select sum() over()",
- "Error during planning: 'sum' does not support zero arguments",
+ "Error during planning: Execution error: Function 'sum' user-defined
coercion failed with \"Execution error: sum function requires 1 argument, got
0\"",
);
}
diff --git a/datafusion/sqllogictest/test_files/functions.slt
b/datafusion/sqllogictest/test_files/functions.slt
index de1dbf74c2..20f79622a6 100644
--- a/datafusion/sqllogictest/test_files/functions.slt
+++ b/datafusion/sqllogictest/test_files/functions.slt
@@ -858,7 +858,7 @@ SELECT greatest(-1, 1, 2.3, 123456789, 3 + 5, -(-4),
abs(-9.0))
123456789
-query error 'greatest' does not support zero argument
+query error Function 'greatest' user-defined coercion failed with "Error
during planning: greatest was called without any arguments. It requires at
least 1."
SELECT greatest()
query I
@@ -1056,7 +1056,7 @@ SELECT least(-1, 1, 2.3, 123456789, 3 + 5, -(-4),
abs(-9.0))
-1
-query error 'least' does not support zero arguments
+query error Function 'least' user-defined coercion failed with "Error during
planning: least was called without any arguments. It requires at least 1."
SELECT least()
query I
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]