This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 4da8e4ca feat(c/sedona-extension): Move FFI scalar kernel definitions
to c/sedona-extension and write them in C (#407)
4da8e4ca is described below
commit 4da8e4ca32430125f00a106b535c047cd075172f
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Dec 9 11:25:41 2025 -0600
feat(c/sedona-extension): Move FFI scalar kernel definitions to
c/sedona-extension and write them in C (#407)
---
Cargo.lock | 15 +
Cargo.toml | 1 +
c/sedona-extension/Cargo.toml | 39 ++
c/sedona-extension/src/extension.rs | 122 ++++
{rust/sedona => c/sedona-extension}/src/lib.rs | 12 +-
c/sedona-extension/src/scalar_kernel.rs | 882 +++++++++++++++++++++++++
c/sedona-extension/src/sedona_extension.h | 210 ++++++
rust/sedona/src/ffi.rs | 515 ---------------
rust/sedona/src/lib.rs | 1 -
9 files changed, 1272 insertions(+), 525 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 15bd3d41..0295231a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4906,6 +4906,21 @@ dependencies = [
"serde_json",
]
+[[package]]
+name = "sedona-extension"
+version = "0.2.0"
+dependencies = [
+ "arrow-array",
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-expr",
+ "libc",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-schema",
+ "sedona-testing",
+]
+
[[package]]
name = "sedona-functions"
version = "0.3.0"
diff --git a/Cargo.toml b/Cargo.toml
index e4372a79..9d5fb784 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@
# under the License.
[workspace]
members = [
+ "c/sedona-extension",
"c/sedona-geoarrow-c",
"c/sedona-geos",
"c/sedona-libgpuspatial",
diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml
new file mode 100644
index 00000000..e8f8c17c
--- /dev/null
+++ b/c/sedona-extension/Cargo.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-extension"
+version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+arrow-array = { workspace = true, features = ["ffi"]}
+arrow-schema = { workspace = true, features = ["ffi"]}
+datafusion-common = { workspace = true }
+datafusion-expr = { workspace = true }
+libc = "0.2.178"
+sedona-common = { workspace = true }
+sedona-expr = { workspace = true }
+sedona-schema = { workspace = true }
+sedona-testing = { path = "../../rust/sedona-testing" }
diff --git a/c/sedona-extension/src/extension.rs
b/c/sedona-extension/src/extension.rs
new file mode 100644
index 00000000..7d957a57
--- /dev/null
+++ b/c/sedona-extension/src/extension.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ ffi::c_int,
+ os::raw::{c_char, c_void},
+ ptr::null_mut,
+};
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+
+/// Raw FFI representation of the SedonaCScalarKernel
+///
+/// See the ImportedScalarKernel and ExportedScalarKernel for high-level
+/// APIs to import and export implementations using this struct.
+#[derive(Default)]
+#[repr(C)]
+pub struct SedonaCScalarKernel {
+ pub function_name:
+ Option<unsafe extern "C" fn(self_: *const SedonaCScalarKernel) ->
*const c_char>,
+ pub new_impl: Option<
+ unsafe extern "C" fn(self_: *const SedonaCScalarKernel, out: *mut
SedonaCScalarKernelImpl),
+ >,
+
+ pub release: Option<unsafe extern "C" fn(self_: *mut SedonaCScalarKernel)>,
+ pub private_data: *mut c_void,
+}
+
+unsafe impl Send for SedonaCScalarKernel {}
+unsafe impl Sync for SedonaCScalarKernel {}
+
+impl Drop for SedonaCScalarKernel {
+ fn drop(&mut self) {
+ if let Some(releaser) = self.release {
+ unsafe { releaser(self) }
+ self.release = None;
+ self.private_data = null_mut();
+ }
+ }
+}
+
+/// Raw FFI representation of the SedonaCScalarKernelImpl
+#[derive(Default)]
+#[repr(C)]
+pub struct SedonaCScalarKernelImpl {
+ pub init: Option<
+ unsafe extern "C" fn(
+ self_: *mut SedonaCScalarKernelImpl,
+ arg_types: *const *const FFI_ArrowSchema,
+ scalar_args: *const *mut FFI_ArrowArray,
+ n_args: i64,
+ out: *mut FFI_ArrowSchema,
+ ) -> c_int,
+ >,
+
+ pub execute: Option<
+ unsafe extern "C" fn(
+ self_: *mut SedonaCScalarKernelImpl,
+ args: *const *mut FFI_ArrowArray,
+ n_args: i64,
+ n_rows: i64,
+ out: *mut FFI_ArrowArray,
+ ) -> c_int,
+ >,
+
+ pub get_last_error:
+ Option<unsafe extern "C" fn(self_: *mut SedonaCScalarKernelImpl) ->
*const c_char>,
+
+ pub release: Option<unsafe extern "C" fn(self_: *mut
SedonaCScalarKernelImpl)>,
+
+ pub private_data: *mut c_void,
+}
+
+impl Drop for SedonaCScalarKernelImpl {
+ fn drop(&mut self) {
+ if let Some(releaser) = self.release {
+ unsafe { releaser(self) }
+ self.release = None;
+ self.private_data = null_mut();
+ }
+ }
+}
+
+/// Check if a schema is valid
+///
+/// The [FFI_ArrowSchema] doesn't have the ability to check for a NULL release
callback,
+/// so we provide a mechanism to do so here.
+pub fn ffi_arrow_schema_is_valid(schema: *const FFI_ArrowSchema) -> bool {
+ let schema_internal = schema as *const c_void as *const
ArrowSchemaInternal;
+ if let Some(schema_ref) = unsafe { schema_internal.as_ref() } {
+ schema_ref.release.is_some()
+ } else {
+ false
+ }
+}
+
+#[repr(C)]
+struct ArrowSchemaInternal {
+ format: *const c_char,
+ name: *const c_char,
+ metadata: *const c_char,
+ flags: i64,
+ n_children: i64,
+ children: *mut *mut ArrowSchemaInternal,
+ dictionary: *mut ArrowSchemaInternal,
+ release: Option<unsafe extern "C" fn(*mut ArrowSchemaInternal)>,
+ private_data: *mut c_void,
+}
diff --git a/rust/sedona/src/lib.rs b/c/sedona-extension/src/lib.rs
similarity index 82%
copy from rust/sedona/src/lib.rs
copy to c/sedona-extension/src/lib.rs
index 52b54387..adc0a921 100644
--- a/rust/sedona/src/lib.rs
+++ b/c/sedona-extension/src/lib.rs
@@ -14,12 +14,6 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-mod catalog;
-pub mod context;
-mod exec;
-pub mod ffi;
-mod object_storage;
-pub mod random_geometry_provider;
-pub mod reader;
-pub mod record_batch_reader_provider;
-pub mod show;
+
+pub(crate) mod extension;
+pub mod scalar_kernel;
diff --git a/c/sedona-extension/src/scalar_kernel.rs
b/c/sedona-extension/src/scalar_kernel.rs
new file mode 100644
index 00000000..17972356
--- /dev/null
+++ b/c/sedona-extension/src/scalar_kernel.rs
@@ -0,0 +1,882 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::{
+ ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema},
+ make_array, ArrayRef,
+};
+use arrow_schema::{ArrowError, Field};
+use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+use sedona_common::sedona_internal_err;
+use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel};
+use sedona_schema::datatypes::SedonaType;
+use std::{
+ ffi::{c_char, c_int, c_void, CStr, CString},
+ fmt::Debug,
+ iter::zip,
+ ptr::{null, null_mut, swap_nonoverlapping},
+ str::FromStr,
+};
+
+use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel,
SedonaCScalarKernelImpl};
+
+/// Wrapper around a [SedonaCScalarKernel] that implements [SedonaScalarKernel]
+///
+/// This is the means by which a kernel implementation may be imported from a
+/// C implementation.
+pub struct ImportedScalarKernel {
+ inner: SedonaCScalarKernel,
+ function_name: Option<String>,
+}
+
+impl Debug for ImportedScalarKernel {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ImportedScalarKernel")
+ .field("inner", &"<SedonaCScalarKernel>")
+ .finish()
+ }
+}
+
+impl TryFrom<SedonaCScalarKernel> for ImportedScalarKernel {
+ type Error = DataFusionError;
+
+ fn try_from(value: SedonaCScalarKernel) -> Result<Self> {
+ match (&value.function_name, &value.new_impl, &value.release) {
+ (Some(function_name), Some(_), Some(_)) => {
+ let name_ptr = unsafe { function_name(&value) };
+ let name = if name_ptr.is_null() {
+ None
+ } else {
+ Some(
+ unsafe { CStr::from_ptr(name_ptr) }
+ .to_string_lossy()
+ .into_owned(),
+ )
+ };
+
+ Ok(Self {
+ inner: value,
+ function_name: name,
+ })
+ }
+ _ => sedona_internal_err!("Can't import released or uninitialized
SedonaCScalarKernel"),
+ }
+ }
+}
+
+impl ImportedScalarKernel {
+ pub fn function_name(&self) -> Option<&str> {
+ self.function_name.as_deref()
+ }
+}
+
+impl SedonaScalarKernel for ImportedScalarKernel {
+ fn return_type_from_args_and_scalars(
+ &self,
+ args: &[SedonaType],
+ scalar_args: &[Option<&ScalarValue>],
+ ) -> Result<Option<SedonaType>> {
+ let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?;
+ inner_impl.init(args, scalar_args)
+ }
+
+ fn invoke_batch_from_args(
+ &self,
+ arg_types: &[SedonaType],
+ args: &[ColumnarValue],
+ return_type: &SedonaType,
+ num_rows: usize,
+ ) -> Result<ColumnarValue> {
+ let arg_scalars = args
+ .iter()
+ .map(|arg| {
+ if let ColumnarValue::Scalar(scalar) = arg {
+ Some(scalar)
+ } else {
+ None
+ }
+ })
+ .collect::<Vec<_>>();
+
+ let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?;
+ inner_impl.init(arg_types, &arg_scalars)?;
+ let result_array = inner_impl.execute(args, return_type, num_rows)?;
+ for arg in args {
+ if let ColumnarValue::Array(_) = arg {
+ return Ok(ColumnarValue::Array(result_array));
+ }
+ }
+
+ if result_array.len() != 1 {
+ sedona_internal_err!(
+ "Expected scalar result but got result with length {}",
+ result_array.len()
+ )
+ } else {
+ Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
+ &result_array,
+ 0,
+ )?))
+ }
+ }
+
+ fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
+ sedona_internal_err!(
+ "Should not be called because return_type_from_args_and_scalars()
is implemented"
+ )
+ }
+
+ fn invoke_batch(
+ &self,
+ _arg_types: &[SedonaType],
+ _args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ sedona_internal_err!("Should not be called because
invoke_batch_from_args() is implemented")
+ }
+}
+
+/// Wrapper class handling the verbose details of preparing and executing FFI
calls
+/// for the [SedonaCScalarKernelImpl]
+struct CScalarKernelImplWrapper {
+ inner: SedonaCScalarKernelImpl,
+}
+
+impl CScalarKernelImplWrapper {
+ fn try_new(factory: &SedonaCScalarKernel) -> Result<Self> {
+ if let Some(init) = factory.new_impl {
+ let mut inner = SedonaCScalarKernelImpl::default();
+ unsafe { init(factory, &mut inner) };
+ Ok(Self { inner })
+ } else {
+ sedona_internal_err!("SedonaCScalarKernel is not valid")
+ }
+ }
+
+ fn init(
+ &mut self,
+ arg_types: &[SedonaType],
+ arg_scalars: &[Option<&ScalarValue>],
+ ) -> Result<Option<SedonaType>> {
+ if arg_types.len() != arg_scalars.len() {
+ return sedona_internal_err!("field/scalar lengths must be
identical");
+ }
+
+ // Convert arg_types to Vec<Field>
+ let arg_fields = arg_types
+ .iter()
+ .map(|sedona_type| sedona_type.to_storage_field("", true))
+ .collect::<Result<Vec<_>>>()?;
+
+ // Convert arg types to Vec<FFI_ArrowSchema>
+ let ffi_fields = arg_fields
+ .iter()
+ .map(FFI_ArrowSchema::try_from)
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ // Convert arg types to Vec<*const FFI_ArrowSchema>
+ let ffi_field_ptrs = ffi_fields
+ .iter()
+ .map(|ffi_field| ffi_field as *const FFI_ArrowSchema)
+ .collect::<Vec<_>>();
+
+ // Convert arg_scalars to Vec<Option<FFI_ArrowArray>>
+ let mut ffi_scalars = arg_scalars
+ .iter()
+ .map(|maybe_scalar| {
+ if let Some(scalar) = maybe_scalar {
+ let array = scalar.to_array()?;
+ Ok(Some(FFI_ArrowArray::new(&array.to_data())))
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Convert arg_scalars to Vec<*mut FFI_ArrowArray>
+ let ffi_scalar_ptrs = ffi_scalars
+ .iter_mut()
+ .map(|maybe_ffi_scalar| match maybe_ffi_scalar {
+ Some(ffi_scalar) => ffi_scalar as *mut FFI_ArrowArray,
+ None => null_mut(),
+ })
+ .collect::<Vec<_>>();
+
+ // Call the FFI implementation of init
+ if let Some(init) = self.inner.init {
+ let mut ffi_out = FFI_ArrowSchema::empty();
+ let code = unsafe {
+ init(
+ &mut self.inner,
+ ffi_field_ptrs.as_ptr(),
+ ffi_scalar_ptrs.as_ptr(),
+ arg_types.len() as i64,
+ &mut ffi_out,
+ )
+ };
+
+ // On success, convert the output to SedonaType. If the
implementation
+ // returned a "released" schema, this is the equivalent to our
return_type()
+ // returning None (for "this kernel doesn't apply").
+ // On error, query the FFI implementation for the last error
string.
+ if code == 0 {
+ if ffi_arrow_schema_is_valid(&ffi_out) {
+ let field = Field::try_from(&ffi_out)?;
+ Ok(Some(SedonaType::from_storage_field(&field)?))
+ } else {
+ Ok(None)
+ }
+ } else {
+ plan_err!(
+ "SedonaCScalarKernelImpl::init failed: {}",
+ self.last_error(code)
+ )
+ }
+ } else {
+ sedona_internal_err!("Invalid SedonaCScalarKernelImpl")
+ }
+ }
+
+ fn execute(
+ &mut self,
+ args: &[ColumnarValue],
+ return_type: &SedonaType,
+ num_rows: usize,
+ ) -> Result<ArrayRef> {
+ // Convert args to Vec<ArrayRef>
+ let arg_arrays = args
+ .iter()
+ .map(|arg| match arg {
+ ColumnarValue::Array(array) => Ok(array.clone()),
+ ColumnarValue::Scalar(scalar_value) => scalar_value.to_array(),
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Convert args to Vec<FFI_ArrowArray>
+ let mut ffi_args = arg_arrays
+ .iter()
+ .map(|arg| FFI_ArrowArray::new(&arg.to_data()))
+ .collect::<Vec<_>>();
+ let ffi_arg_ptrs = ffi_args
+ .iter_mut()
+ .map(|arg| arg as *mut FFI_ArrowArray)
+ .collect::<Vec<_>>();
+
+ // Call the FFI implementation of execute()
+ if let Some(execute) = self.inner.execute {
+ let mut ffi_out = FFI_ArrowArray::empty();
+ let code = unsafe {
+ execute(
+ &mut self.inner,
+ ffi_arg_ptrs.as_ptr(),
+ args.len() as i64,
+ num_rows as i64,
+ &mut ffi_out,
+ )
+ };
+
+ // On success, convert the result to an ArrayRef.
+ // On error, query the FFI implementation for the last error
string.
+ if code == 0 {
+ let data = unsafe {
+ arrow_array::ffi::from_ffi_and_data_type(
+ ffi_out,
+ return_type.storage_type().clone(),
+ )?
+ };
+ Ok(arrow_array::make_array(data))
+ } else {
+ plan_err!(
+ "SedonaCScalarKernelImpl::execute failed: {}",
+ self.last_error(code)
+ )
+ }
+ } else {
+ sedona_internal_err!("Invalid SedonaCScalarKernelImpl")
+ }
+ }
+
+ /// Helper to get the last error from the FFI implementation as a Rust
String
+ fn last_error(&mut self, code: c_int) -> String {
+ if let Some(get_last_error) = self.inner.get_last_error {
+ let c_err = unsafe { get_last_error(&mut self.inner) };
+ if c_err.is_null() {
+ format!("({code})")
+ } else {
+ unsafe { CStr::from_ptr(c_err) }
+ .to_string_lossy()
+ .into_owned()
+ }
+ } else {
+ "Invalid SedonaCScalarKernelImpl".to_string()
+ }
+ }
+}
+
+/// Wrapper around a [ScalarKernelRef] that may be used to export an existing
+/// kernel across an FFI boundary using the [SedonaCScalarKernel]
+pub struct ExportedScalarKernel {
+ inner: ScalarKernelRef,
+ function_name: Option<CString>,
+}
+
+impl From<ScalarKernelRef> for ExportedScalarKernel {
+ fn from(value: ScalarKernelRef) -> Self {
+ ExportedScalarKernel {
+ inner: value,
+ function_name: None,
+ }
+ }
+}
+
+impl From<ExportedScalarKernel> for SedonaCScalarKernel {
+ fn from(value: ExportedScalarKernel) -> Self {
+ let box_value = Box::new(value);
+ Self {
+ function_name: Some(c_factory_function_name),
+ new_impl: Some(c_factory_new_impl),
+ release: Some(c_factory_release),
+ private_data: Box::leak(box_value) as *mut ExportedScalarKernel as
*mut c_void,
+ }
+ }
+}
+
+impl ExportedScalarKernel {
+ /// Add a function name to this exported kernel
+ ///
+ /// This ensures the kernel will be registered with the appropriate
function
+ /// when passed across a boundary.
+ pub fn with_function_name(self, function_name: impl AsRef<str>) -> Self {
+ Self {
+ inner: self.inner,
+ function_name:
Some(CString::from_str(function_name.as_ref()).unwrap()),
+ }
+ }
+
+ fn new_impl(&self) -> ExportedScalarKernelImpl {
+ ExportedScalarKernelImpl::new(self.inner.clone())
+ }
+}
+
+/// C callable wrapper to expose [ExportedScalarKernel::function_name]
+unsafe extern "C" fn c_factory_function_name(self_: *const
SedonaCScalarKernel) -> *const c_char {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_ref().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let private_data = (self_ref.private_data as *mut ExportedScalarKernel)
+ .as_ref()
+ .unwrap();
+ if let Some(function_name) = &private_data.function_name {
+ function_name.as_ptr()
+ } else {
+ null()
+ }
+}
+
+/// C callable wrapper around [ExportedScalarKernel::new_impl]
+unsafe extern "C" fn c_factory_new_impl(
+ self_: *const SedonaCScalarKernel,
+ out: *mut SedonaCScalarKernelImpl,
+) {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_ref().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let private_data = (self_ref.private_data as *mut ExportedScalarKernel)
+ .as_ref()
+ .unwrap();
+ *out = SedonaCScalarKernelImpl::from(private_data.new_impl())
+}
+
+/// C Callable wrapper called when this value is dropped via FFI
+unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarKernel) {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_mut().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let boxed = Box::from_raw(self_ref.private_data as *mut
ExportedScalarKernel);
+ drop(boxed);
+
+ self_ref.private_data = null_mut();
+ self_ref.release = None;
+}
+
+/// Rust-backed implementation of [SedonaCScalarKernelImpl]
+struct ExportedScalarKernelImpl {
+ inner: ScalarKernelRef,
+ last_arg_types: Option<Vec<SedonaType>>,
+ last_return_type: Option<SedonaType>,
+ last_error: CString,
+}
+
+impl From<ExportedScalarKernelImpl> for SedonaCScalarKernelImpl {
+ fn from(value: ExportedScalarKernelImpl) -> Self {
+ let box_value = Box::new(value);
+ Self {
+ init: Some(c_kernel_init),
+ execute: Some(c_kernel_execute),
+ get_last_error: Some(c_kernel_last_error),
+ release: Some(c_kernel_release),
+ private_data: Box::leak(box_value) as *mut
ExportedScalarKernelImpl as *mut c_void,
+ }
+ }
+}
+
+impl ExportedScalarKernelImpl {
+ fn new(kernel: ScalarKernelRef) -> Self {
+ Self {
+ inner: kernel,
+ last_arg_types: None,
+ last_return_type: None,
+ last_error: CString::default(),
+ }
+ }
+
+ fn init(
+ &mut self,
+ ffi_types: &[*const FFI_ArrowSchema],
+ ffi_scalar_args: &[*mut FFI_ArrowArray],
+ ) -> Result<Option<FFI_ArrowSchema>> {
+ // Convert the input types to Vec<Field>
+ let arg_fields = ffi_types
+ .iter()
+ .map(|ptr| {
+ if let Some(ffi_schema) = unsafe { ptr.as_ref() } {
+ Field::try_from(ffi_schema)
+ } else {
+ Err(ArrowError::CDataInterface(
+ "FFI_ArrowSchema is NULL".to_string(),
+ ))
+ }
+ })
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ // Convert the input types to Vec<SedonaType>
+ let args = arg_fields
+ .iter()
+ .map(SedonaType::from_storage_field)
+ .collect::<Result<Vec<_>>>()?;
+
+ // Convert the scalar arguments to Vec<Option<ArrayRef>>
+ let arg_arrays = zip(ffi_scalar_args, &args)
+ .map(|(ptr, arg)| {
+ if ptr.is_null() {
+ Ok(None)
+ } else {
+ let owned_ffi_array = unsafe {
FFI_ArrowArray::from_raw(*ptr) };
+ let data = unsafe {
+ from_ffi_and_data_type(owned_ffi_array,
arg.storage_type().clone())?
+ };
+ Ok(Some(make_array(data)))
+ }
+ })
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ // Convert the scalar arguments to Vec<Option<ScalarValue>>
+ let scalar_args = arg_arrays
+ .iter()
+ .map(|maybe_array| {
+ if let Some(array) = maybe_array {
+ Ok(Some(ScalarValue::try_from_array(array, 0)?))
+ } else {
+ Ok(None)
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Convert the scalar arguments to Vec<Option<&ScalarValue>>
+ let scalar_arg_refs = scalar_args
+ .iter()
+ .map(|arg| arg.as_ref())
+ .collect::<Vec<_>>();
+
+ // Call the implementation
+ let maybe_return_type = self
+ .inner
+ .return_type_from_args_and_scalars(&args, &scalar_arg_refs)?;
+
+ // Convert the result to FFI_ArrowSchema (if not None)
+ let return_ffi_schema = if let Some(return_type) = &maybe_return_type {
+ let return_field = return_type.to_storage_field("", true)?;
+ let return_ffi_schema = FFI_ArrowSchema::try_from(&return_field)?;
+ Some(return_ffi_schema)
+ } else {
+ None
+ };
+
+ // Save the argument types and return type for following calls to
execute()
+ self.last_arg_types.replace(args);
+ self.last_return_type = maybe_return_type;
+
+ Ok(return_ffi_schema)
+ }
+
+ fn execute(&self, ffi_args: &[*mut FFI_ArrowArray], num_rows: i64) ->
Result<FFI_ArrowArray> {
+ match (&self.last_arg_types, &self.last_return_type) {
+ (Some(arg_types), Some(return_type)) => {
+ // Resolve args as Vec<ArrayRef>
+ let arg_arrays = zip(ffi_args, arg_types)
+ .map(|(ptr, arg)| {
+ let owned_ffi_array = unsafe {
FFI_ArrowArray::from_raw(*ptr) };
+ let data = unsafe {
+ from_ffi_and_data_type(owned_ffi_array,
arg.storage_type().clone())?
+ };
+ Ok(make_array(data))
+ })
+ .collect::<Result<Vec<_>, ArrowError>>()?;
+
+ // Resolve args as Vec<ColumnarValue>
+ let args = arg_arrays
+ .into_iter()
+ .map(|array| {
+ if array.len() as i64 == num_rows {
+ Ok(ColumnarValue::Array(array))
+ } else {
+
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
+ &array, 0,
+ )?))
+ }
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ // Call the implementation
+ let result_value = self.inner.invoke_batch_from_args(
+ arg_types,
+ &args,
+ return_type,
+ num_rows as usize,
+ )?;
+
+ // Convert the result to an ArrayRef
+ let result_array = match result_value {
+ ColumnarValue::Array(array) => array,
+ ColumnarValue::Scalar(scalar_value) =>
scalar_value.to_array()?,
+ };
+
+ // Convert the result to a FFI_ArrowArray
+ let result_ffi_array =
FFI_ArrowArray::new(&result_array.to_data());
+ Ok(result_ffi_array)
+ }
+ _ => {
+ sedona_internal_err!("Call to ExportedScalarKernel::execute()
before init()")
+ }
+ }
+ }
+}
+
+/// C callable wrapper around [ExportedScalarKernelImpl::init]
+unsafe extern "C" fn c_kernel_init(
+ self_: *mut SedonaCScalarKernelImpl,
+ arg_types: *const *const FFI_ArrowSchema,
+ scalar_args: *const *mut FFI_ArrowArray,
+ n_args: i64,
+ out: *mut FFI_ArrowSchema,
+) -> c_int {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_ref().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl)
+ .as_mut()
+ .unwrap();
+
+ let ffi_types = std::slice::from_raw_parts(arg_types, n_args as usize);
+ let ffi_scalar_args = std::slice::from_raw_parts(scalar_args, n_args as
usize);
+
+ match private_data.init(ffi_types, ffi_scalar_args) {
+ Ok(Some(mut return_ffi_schema)) => {
+ swap_nonoverlapping(&mut return_ffi_schema as *mut _, out, 1);
+ 0
+ }
+ Ok(None) => {
+ *out = FFI_ArrowSchema::empty();
+ 0
+ }
+ Err(err) => {
+ private_data.last_error =
+
CString::from_str(&err.message()).unwrap_or(CString::default());
+ libc::EINVAL
+ }
+ }
+}
+
+/// C callable wrapper around [ExportedScalarKernelImpl::execute]
+unsafe extern "C" fn c_kernel_execute(
+ self_: *mut SedonaCScalarKernelImpl,
+ args: *const *mut FFI_ArrowArray,
+ n_args: i64,
+ n_rows: i64,
+ out: *mut FFI_ArrowArray,
+) -> c_int {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_ref().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl)
+ .as_mut()
+ .unwrap();
+
+ let ffi_args = std::slice::from_raw_parts(args, n_args as usize);
+ match private_data.execute(ffi_args, n_rows) {
+ Ok(mut ffi_array) => {
+ swap_nonoverlapping(&mut ffi_array as *mut _, out, 1);
+ 0
+ }
+ Err(err) => {
+ private_data.last_error =
+
CString::from_str(&err.message()).unwrap_or(CString::default());
+ libc::EINVAL
+ }
+ }
+}
+
+/// C Callable wrapper to retrieve the last error string
+unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarKernelImpl)
-> *const c_char {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_ref().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl)
+ .as_ref()
+ .unwrap();
+ private_data.last_error.as_ptr()
+}
+
+/// C Callable wrapper called when this value is dropped via FFI
+unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarKernelImpl) {
+ assert!(!self_.is_null());
+ let self_ref = self_.as_mut().unwrap();
+
+ assert!(!self_ref.private_data.is_null());
+ let boxed = Box::from_raw(self_ref.private_data as *mut
ExportedScalarKernelImpl);
+ drop(boxed);
+
+ self_ref.private_data = null_mut();
+ self_ref.release = None;
+}
+
+#[cfg(test)]
+mod test {
+ use std::sync::Arc;
+
+ use arrow_schema::DataType;
+ use datafusion_common::exec_err;
+ use datafusion_expr::Volatility;
+ use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
+ use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher};
+ use sedona_testing::{create::create_array, testers::ScalarUdfTester};
+
+ use super::*;
+
+ #[test]
+ fn ffi_roundtrip() {
+ let kernel = SimpleSedonaScalarKernel::new_ref(
+ ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
+ Arc::new(|_, args| Ok(args[0].clone())),
+ );
+
+ let array_value = create_array(&[Some("POINT (0 1)"), None],
&WKB_GEOMETRY);
+
+ let udf_native = SedonaScalarUDF::new(
+ "simple_udf",
+ vec![kernel.clone()],
+ Volatility::Immutable,
+ None,
+ );
+
+ let tester = ScalarUdfTester::new(udf_native.into(),
vec![WKB_GEOMETRY]);
+ tester.assert_return_type(WKB_GEOMETRY);
+
+ let result = tester.invoke_scalar("POINT (0 1)").unwrap();
+ tester.assert_scalar_result_equals(result, "POINT (0 1)");
+
+ assert_eq!(
+ &tester.invoke_array(array_value.clone()).unwrap(),
+ &array_value
+ );
+
+ let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+ let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+ let imported_kernel =
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+ let udf_from_ffi = SedonaScalarUDF::new(
+ "simple_udf_from_ffi",
+ vec![Arc::new(imported_kernel)],
+ Volatility::Immutable,
+ None,
+ );
+
+ let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(),
vec![WKB_GEOMETRY]);
+ ffi_tester.assert_return_type(WKB_GEOMETRY);
+
+ let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap();
+ ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)");
+
+ assert_eq!(
+ &ffi_tester.invoke_array(array_value.clone()).unwrap(),
+ &array_value
+ );
+
+ // Check the case of a kernel that does not apply to input arguments
+ let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(),
vec![]);
+ let err = ffi_tester.return_type().unwrap_err();
+ assert_eq!(
+ err.message(),
+ "simple_udf_from_ffi([]): No kernel matching arguments"
+ );
+ }
+
+ #[test]
+ fn named_kernel() {
+ let kernel = SimpleSedonaScalarKernel::new_ref(
+ ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
+ Arc::new(|_, args| Ok(args[0].clone())),
+ );
+
+ // Without intervening, we have a None name
+ let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+ let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+ let imported_kernel =
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+ assert!(imported_kernel.function_name().is_none());
+
+ // If we set a function name, it should be roundtripped
+ let exported_kernel =
+
ExportedScalarKernel::from(kernel.clone()).with_function_name("foofy");
+ let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+ let imported_kernel =
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+ assert_eq!(imported_kernel.function_name(), Some("foofy"));
+ }
+
+ #[test]
+ fn invoke_batch_from_scalar() {
+ let kernel = Arc::new(ReturnTypeFromScalars {}) as ScalarKernelRef;
+ let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+ let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+ let imported_kernel =
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+ let udf_from_ffi = SedonaScalarUDF::new(
+ "simple_udf_from_ffi",
+ vec![Arc::new(imported_kernel)],
+ Volatility::Immutable,
+ None,
+ );
+
+ let ffi_tester = ScalarUdfTester::new(
+ udf_from_ffi.clone().into(),
+ vec![SedonaType::Arrow(DataType::Utf8)],
+ );
+ let return_type =
ffi_tester.return_type_with_scalar(Some("foofy")).unwrap();
+ assert_eq!(return_type, SedonaType::Arrow(DataType::Utf8));
+ }
+
+ #[derive(Debug)]
+ struct ReturnTypeFromScalars {}
+
+ impl SedonaScalarKernel for ReturnTypeFromScalars {
+ fn return_type_from_args_and_scalars(
+ &self,
+ _args: &[SedonaType],
+ scalar_args: &[Option<&ScalarValue>],
+ ) -> Result<Option<SedonaType>> {
+ if let Some(arg0) = scalar_args[0] {
+ Ok(Some(SedonaType::Arrow(arg0.data_type())))
+ } else {
+ Ok(None)
+ }
+ }
+
+ fn return_type(&self, _args: &[SedonaType]) ->
Result<Option<SedonaType>> {
+ unreachable!()
+ }
+
+ fn invoke_batch(
+ &self,
+ _arg_types: &[SedonaType],
+ _args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ unreachable!()
+ }
+ }
+
+ #[test]
+ fn erroring_invoke_batch() {
+ let kernel = SimpleSedonaScalarKernel::new_ref(
+ ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
+ Arc::new(|_, _args| exec_err!("this invoke_batch() always
errors")),
+ );
+
+ let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+ let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+ let imported_kernel =
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+ let udf_from_ffi = SedonaScalarUDF::new(
+ "simple_udf_from_ffi",
+ vec![Arc::new(imported_kernel)],
+ Volatility::Immutable,
+ None,
+ );
+
+ let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(),
vec![WKB_GEOMETRY]);
+ ffi_tester.assert_return_type(WKB_GEOMETRY);
+
+ let err = ffi_tester.invoke_scalar("POINT (0 1)").unwrap_err();
+ assert_eq!(
+ err.message(),
+ "SedonaCScalarKernelImpl::execute failed: this invoke_batch()
always errors"
+ );
+ }
+
+ #[test]
+ fn erroring_return_type() {
+ let kernel = Arc::new(ErroringReturnType {}) as ScalarKernelRef;
+ let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+ let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+ let imported_kernel =
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+ let udf_from_ffi = SedonaScalarUDF::new(
+ "simple_udf_from_ffi",
+ vec![Arc::new(imported_kernel)],
+ Volatility::Immutable,
+ None,
+ );
+
+ let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(),
vec![WKB_GEOMETRY]);
+ let err = ffi_tester.return_type().unwrap_err();
+ assert_eq!(
+ err.message(),
+ "SedonaCScalarKernelImpl::init failed: this implementation of
return_type always errors"
+ );
+ }
+
+ #[derive(Debug)]
+ struct ErroringReturnType {}
+
+ impl SedonaScalarKernel for ErroringReturnType {
+ fn return_type(&self, _args: &[SedonaType]) ->
Result<Option<SedonaType>> {
+ plan_err!("this implementation of return_type always errors")
+ }
+
+ fn invoke_batch(
+ &self,
+ _arg_types: &[SedonaType],
+ _args: &[ColumnarValue],
+ ) -> Result<ColumnarValue> {
+ unreachable!()
+ }
+ }
+}
diff --git a/c/sedona-extension/src/sedona_extension.h
b/c/sedona-extension/src/sedona_extension.h
new file mode 100644
index 00000000..7191af21
--- /dev/null
+++ b/c/sedona-extension/src/sedona_extension.h
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SEDONA_EXTENSION_H
+#define SEDONA_EXTENSION_H
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// Extra guard for versions of Arrow without the canonical guard
+#ifndef ARROW_FLAG_DICTIONARY_ORDERED
+
+#ifndef ARROW_C_DATA_INTERFACE
+#define ARROW_C_DATA_INTERFACE
+
+#define ARROW_FLAG_DICTIONARY_ORDERED 1
+#define ARROW_FLAG_NULLABLE 2
+#define ARROW_FLAG_MAP_KEYS_SORTED 4
+
+struct ArrowSchema {
+ // Array type description
+ const char* format;
+ const char* name;
+ const char* metadata;
+ int64_t flags;
+ int64_t n_children;
+ struct ArrowSchema** children;
+ struct ArrowSchema* dictionary;
+
+ // Release callback
+ void (*release)(struct ArrowSchema*);
+ // Opaque producer-specific data
+ void* private_data;
+};
+
+struct ArrowArray {
+ // Array data description
+ int64_t length;
+ int64_t null_count;
+ int64_t offset;
+ int64_t n_buffers;
+ int64_t n_children;
+ const void** buffers;
+ struct ArrowArray** children;
+ struct ArrowArray* dictionary;
+
+ // Release callback
+ void (*release)(struct ArrowArray*);
+ // Opaque producer-specific data
+ void* private_data;
+};
+
+#endif // ARROW_C_DATA_INTERFACE
+
+#ifndef ARROW_C_STREAM_INTERFACE
+#define ARROW_C_STREAM_INTERFACE
+
+struct ArrowArrayStream {
+ // Callback to get the stream type
+ // (will be the same for all arrays in the stream).
+ //
+ // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+ //
+ // If successful, the ArrowSchema must be released independently from the
stream.
+ int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+
+ // Callback to get the next array
+ // (if no error and the array is released, the stream has ended)
+ //
+ // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+ //
+ // If successful, the ArrowArray must be released independently from the
stream.
+ int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+ // Callback to get optional detailed error information.
+ // This must only be called if the last stream operation failed
+ // with a non-0 return code.
+ //
+ // Return value: pointer to a null-terminated character array describing
+ // the last error, or NULL if no description is available.
+ //
+ // The returned pointer is only valid until the next operation on this stream
+ // (including release).
+ const char* (*get_last_error)(struct ArrowArrayStream*);
+
+ // Release callback: release the stream's own resources.
+ // Note that arrays returned by `get_next` must be individually released.
+ void (*release)(struct ArrowArrayStream*);
+
+ // Opaque producer-specific data
+ void* private_data;
+};
+
+#endif // ARROW_C_STREAM_INTERFACE
+#endif // ARROW_FLAG_DICTIONARY_ORDERED
+
+/// \brief Simple ABI-stable scalar function implementation
+///
+/// This object is not thread safe: callers must take care to serialize
+/// access to methods if an instance is shared across threads. In general,
+/// constructing and initializing this structure should be sufficiently
+/// cheap that it shouldn't need to be shared in this way.
+///
+/// Briefly, the SedonaCScalarKernelImpl is typically the stack-allocated
+/// structure that is not thread safe and the SedonaCScalarKernel is the
+/// value that lives in a registry (whose job it is to initialize
implementations
+/// on each stack that needs one).
+struct SedonaCScalarKernelImpl {
+ /// \brief Initialize the state of this instance and calculate a return type
+ ///
+ /// The init callback either computes a return ArrowSchema or initializes the
+ /// return ArrowSchema to an explicitly released value to indicate that this
+ /// implementation does not apply to the arguments passed. An implementation
+ /// that does not apply to the arguments passed is not necessarily an error
+ /// (there may be another implementation prepared to handle such a case).
+ ///
+ /// \param arg_types Argument types
+ /// \param scalar_args An optional array of scalar arguments. The entire
+ /// array may be null to indicate that none of the arguments are scalars, or
+ /// individual items in the array may be NULL to indicate that a particular
+ /// argument is not a scalar. Any non-NULL arrays must be of length 1.
+ /// Implementations MAY take ownership over the elements of scalar_args but
+ /// are not required to do so (i.e., caller must check if these elements were
+ /// released, and must release them if needed).
+ /// \param n_args Number of elements in the arg_types and/or scalar_args
arrays.
+ /// \param out Will be populated with the return type on success, or
initialized
+ /// to a released value if this implementation does not apply to the
arguments
+ /// passed.
+ ///
+ /// \return An errno-compatible error code, or zero on success.
+ int (*init)(struct SedonaCScalarKernelImpl* self,
+ const struct ArrowSchema* const* arg_types,
+ struct ArrowArray* const* scalar_args, int64_t n_args,
+ struct ArrowSchema* out);
+
+ /// \brief Execute a single batch
+ ///
+ /// \param args Input arguments. Input must be length one (e.g., a scalar)
+ /// or the size of the batch. Implementations must handle scalar or array
+ /// inputs.
+ /// \param n_args The number of pointers in args
+ /// \param out Will be populated with the result on success.
+ int (*execute)(struct SedonaCScalarKernelImpl* self, struct ArrowArray*
const* args,
+ int64_t n_args, int64_t n_rows, struct ArrowArray* out);
+
+ /// \brief Get the last error message
+ ///
+ /// The result is valid until the next call to a UDF method.
+ const char* (*get_last_error)(struct SedonaCScalarKernelImpl* self);
+
+ /// \brief Release this instance
+ ///
+ /// Implementations of this callback must set self->release to NULL.
+ void (*release)(struct SedonaCScalarKernelImpl* self);
+
+ /// \brief Opaque implementation-specific data
+ void* private_data;
+};
+
+/// \brief Scalar function/kernel initializer
+///
+/// Usually a SedonaCScalarKernelImpl will be used to execute a single batch
+/// (although it may be reused if a caller can serialize callback use). This
+/// structure is a factory object that initializes such objects. The
+/// SedonaCScalarKernel is designed to be thread-safe and live in a registry.
+struct SedonaCScalarKernel {
+ /// \brief Function name
+ ///
+ /// Optional function name. This is used to register the kernel with the
+ /// appropriate function when passing this kernel across a boundary.
+ const char* (*function_name)(const struct SedonaCScalarKernel* self);
+
+ /// \brief Initialize a new implementation struct
+ ///
+ /// This callback is thread safe and may be called concurrently from any
+ /// thread at any time (as long as this object is valid).
+ void (*new_impl)(const struct SedonaCScalarKernel* self,
+ struct SedonaCScalarKernelImpl* out);
+
+ /// \brief Release this instance
+ ///
+ /// Implementations of this callback must set self->release to NULL.
+ void (*release)(struct SedonaCScalarKernel* self);
+
+ /// \brief Opaque implementation-specific data
+ void* private_data;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/rust/sedona/src/ffi.rs b/rust/sedona/src/ffi.rs
deleted file mode 100644
index e1cb1f7b..00000000
--- a/rust/sedona/src/ffi.rs
+++ /dev/null
@@ -1,515 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use std::{any::Any, sync::Arc};
-
-use abi_stable::StableAbi;
-use arrow_schema::{DataType, Field, FieldRef, Schema};
-use datafusion::physical_plan::{expressions::Column, PhysicalExpr};
-use datafusion_common::{config::ConfigOptions, DataFusionError, Result,
ScalarValue};
-use datafusion_expr::{
- function::{AccumulatorArgs, StateFieldsArgs},
- Accumulator, AggregateUDF, AggregateUDFImpl, ColumnarValue,
ReturnFieldArgs,
- ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
-};
-use datafusion_ffi::{
- udaf::{FFI_AggregateUDF, ForeignAggregateUDF},
- udf::{FFI_ScalarUDF, ForeignScalarUDF},
-};
-use sedona_common::sedona_internal_err;
-use sedona_schema::datatypes::SedonaType;
-
-use sedona_expr::{
- aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef},
- scalar_udf::{ScalarKernelRef, SedonaScalarKernel},
-};
-
-/// A stable struct for sharing [SedonaScalarKernel]s across FFI boundaries
-///
-/// The primary interface for importing or exporting these is `.from()`
-/// and `.into()` between the [FFI_SedonaScalarKernel] and the
[ScalarKernelRef].
-///
-/// Internally this struct uses the [FFI_ScalarUDF] from DataFusion's FFI
-/// library to avoid having to invent an FFI ourselves. Like the
[FFI_ScalarUDF],
-/// this struct is only convenient to use when the libraries on both sides of
-/// a boundary are written in Rust. Because Rust makes it relatively easy to
-/// wrap C or C++ libraries, this should not be a barrier for most types of
-/// kernels we might want to implement; however, it is also an option to
-/// create our own FFI using simpler primitives if using DataFusion's
-/// introduces performance or implementation issues.
-#[repr(C)]
-#[derive(Debug, StableAbi)]
-#[allow(non_camel_case_types)]
-pub struct FFI_SedonaScalarKernel {
- inner: FFI_ScalarUDF,
-}
-
-impl From<ScalarKernelRef> for FFI_SedonaScalarKernel {
- fn from(value: ScalarKernelRef) -> Self {
- let exported =
ScalarUDF::new_from_impl(ExportedScalarKernel::from(value));
- FFI_SedonaScalarKernel {
- inner: Arc::new(exported).into(),
- }
- }
-}
-
-impl TryFrom<&FFI_SedonaScalarKernel> for ScalarKernelRef {
- type Error = DataFusionError;
-
- fn try_from(value: &FFI_SedonaScalarKernel) -> Result<Self> {
- Ok(Arc::new(ImportedScalarKernel::try_from(value)?))
- }
-}
-
-impl TryFrom<FFI_SedonaScalarKernel> for ScalarKernelRef {
- type Error = DataFusionError;
-
- fn try_from(value: FFI_SedonaScalarKernel) -> Result<Self> {
- Self::try_from(&value)
- }
-}
-
-#[derive(Debug)]
-struct ExportedScalarKernel {
- name: String,
- signature: Signature,
- sedona_impl: ScalarKernelRef,
-}
-
-impl PartialEq for ExportedScalarKernel {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name
- }
-}
-
-impl Eq for ExportedScalarKernel {}
-
-impl std::hash::Hash for ExportedScalarKernel {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- }
-}
-
-impl From<ScalarKernelRef> for ExportedScalarKernel {
- fn from(value: ScalarKernelRef) -> Self {
- Self {
- name: "ExportedScalarKernel".to_string(),
- signature: Signature::any(0,
datafusion_expr::Volatility::Volatile),
- sedona_impl: value,
- }
- }
-}
-
-impl ScalarUDFImpl for ExportedScalarKernel {
- fn as_any(&self) -> &dyn std::any::Any {
- self
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- sedona_internal_err!("should not be called")
- }
-
- fn return_field_from_args(&self, args: ReturnFieldArgs) ->
Result<FieldRef> {
- let sedona_types = args
- .arg_fields
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect::<Result<Vec<_>>>()?;
- match self.sedona_impl.return_type(&sedona_types)? {
- Some(output_type) => Ok(output_type.to_storage_field("",
true)?.into()),
- // Sedona kernels return None to indicate the kernel doesn't apply
to the inputs,
- // but the ScalarUDFImpl doesn't have a way to natively indicate
that. We use
- // NotImplemented with a special message and catch it on the other
side.
- None => Err(DataFusionError::NotImplemented(
- "::kernel does not match input args::".to_string(),
- )),
- }
- }
-
- fn invoke_with_args(&self, args: ScalarFunctionArgs) ->
Result<ColumnarValue> {
- let sedona_types = args
- .arg_fields
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect::<Result<Vec<_>>>()?;
- self.sedona_impl.invoke_batch(&sedona_types, &args.args)
- }
-}
-
-#[derive(Debug)]
-struct ImportedScalarKernel {
- udf_impl: ScalarUDF,
-}
-
-impl TryFrom<&FFI_SedonaScalarKernel> for ImportedScalarKernel {
- type Error = DataFusionError;
-
- fn try_from(value: &FFI_SedonaScalarKernel) -> Result<Self> {
- let wrapped = ForeignScalarUDF::try_from(&value.inner)?;
- Ok(Self {
- udf_impl: ScalarUDF::new_from_impl(wrapped),
- })
- }
-}
-
-impl SedonaScalarKernel for ImportedScalarKernel {
- fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
- let df_args = ReturnFieldArgs {
- arg_fields: &args
- .iter()
- .map(|arg| arg.to_storage_field("", true).map(Arc::new))
- .collect::<Result<Vec<_>>>()?,
- scalar_arguments: &[],
- };
- match self.udf_impl.return_field_from_args(df_args) {
- Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)),
- Err(err) => {
- if matches!(err, DataFusionError::NotImplemented(_)) {
- Ok(None)
- } else {
- Err(err)
- }
- }
- }
- }
-
- fn invoke_batch(
- &self,
- arg_types: &[SedonaType],
- args: &[ColumnarValue],
- ) -> Result<ColumnarValue> {
- let arg_rows = Self::output_size(args);
-
- let scalar_fn_args = ScalarFunctionArgs {
- args: args.to_vec(),
- arg_fields: arg_types
- .iter()
- .map(|arg| arg.to_storage_field("", true).map(Arc::new))
- .collect::<Result<Vec<_>>>()?,
- number_rows: arg_rows.unwrap_or(1),
- // Wrapper code on the other side of this doesn't use this value
- return_field: Field::new("", DataType::Null, true).into(),
- config_options: Arc::new(ConfigOptions::default()),
- };
-
- // DataFusion's FFI_ScalarUDF always returns array output but
- // our original UDFs were careful to return ScalarValues.
- match self.udf_impl.invoke_with_args(scalar_fn_args)? {
- ColumnarValue::Array(array) => match arg_rows {
- Some(_) => Ok(ColumnarValue::Array(array)),
- None => Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
- &array, 0,
- )?)),
- },
- ColumnarValue::Scalar(scalar_value) => {
- // This branch is probably never taken but may in the future
- Ok(ColumnarValue::Scalar(scalar_value))
- }
- }
- }
-}
-
-impl ImportedScalarKernel {
- fn output_size(args: &[ColumnarValue]) -> Option<usize> {
- for original_arg in args {
- if let ColumnarValue::Array(array) = original_arg {
- return Some(array.len());
- }
- }
- None
- }
-}
-
-/// A stable struct for sharing [SedonaAccumulator]s across FFI boundaries
-///
-/// The primary interface for importing or exporting these is `.from()`
-/// and `.into()` between the [FFI_SedonaAggregateKernel] and the
[SedonaAccumulatorRef].
-///
-/// Internally this struct uses the [FFI_AggregateUDF] from DataFusion's FFI
-/// library to avoid having to invent an FFI ourselves. See
[FFI_SedonaScalarKernel]
-/// for general information about the rationale and usage of FFI
implementations.
-#[repr(C)]
-#[derive(Debug, StableAbi)]
-#[allow(non_camel_case_types)]
-pub struct FFI_SedonaAggregateKernel {
- inner: FFI_AggregateUDF,
-}
-
-impl From<SedonaAccumulatorRef> for FFI_SedonaAggregateKernel {
- fn from(value: SedonaAccumulatorRef) -> Self {
- let exported: AggregateUDF =
ExportedSedonaAccumulator::from(value).into();
- FFI_SedonaAggregateKernel {
- inner: Arc::new(exported).into(),
- }
- }
-}
-
-impl TryFrom<&FFI_SedonaAggregateKernel> for SedonaAccumulatorRef {
- type Error = DataFusionError;
-
- fn try_from(value: &FFI_SedonaAggregateKernel) -> Result<Self> {
- Ok(Arc::new(ImportedSedonaAccumulator::try_from(value)?))
- }
-}
-
-impl TryFrom<FFI_SedonaAggregateKernel> for SedonaAccumulatorRef {
- type Error = DataFusionError;
-
- fn try_from(value: FFI_SedonaAggregateKernel) -> Result<Self> {
- Self::try_from(&value)
- }
-}
-
-#[derive(Debug)]
-struct ExportedSedonaAccumulator {
- name: String,
- signature: Signature,
- sedona_impl: SedonaAccumulatorRef,
-}
-
-impl PartialEq for ExportedSedonaAccumulator {
- fn eq(&self, other: &Self) -> bool {
- self.name == other.name
- }
-}
-
-impl Eq for ExportedSedonaAccumulator {}
-
-impl std::hash::Hash for ExportedSedonaAccumulator {
- fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
- self.name.hash(state);
- }
-}
-
-impl From<SedonaAccumulatorRef> for ExportedSedonaAccumulator {
- fn from(value: SedonaAccumulatorRef) -> Self {
- Self {
- name: "ExportedSedonaAccumulator".to_string(),
- signature: Signature::any(0,
datafusion_expr::Volatility::Volatile),
- sedona_impl: value,
- }
- }
-}
-
-impl AggregateUDFImpl for ExportedSedonaAccumulator {
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn name(&self) -> &str {
- &self.name
- }
-
- fn signature(&self) -> &Signature {
- &self.signature
- }
-
- fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> {
- let sedona_types = arg_fields
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect::<Result<Vec<_>>>()?;
- match self.sedona_impl.return_type(&sedona_types)? {
- Some(output_type) => Ok(Arc::new(output_type.to_storage_field("",
true)?)),
- // Sedona kernels return None to indicate the kernel doesn't apply
to the inputs,
- // but the ScalarUDFImpl doesn't have a way to natively indicate
that. We use
- // NotImplemented with a special message and catch it on the other
side.
- None => Err(DataFusionError::NotImplemented(
- "::kernel does not match input args::".to_string(),
- )),
- }
- }
-
- fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
- sedona_internal_err!("This should not be called (use return_field())")
- }
-
- fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn
Accumulator>> {
- let arg_fields = acc_args
- .exprs
- .iter()
- .map(|expr| expr.return_field(acc_args.schema))
- .collect::<Result<Vec<_>>>()?;
- let sedona_types = arg_fields
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect::<Result<Vec<_>>>()?;
- if let Some(output_type) =
self.sedona_impl.return_type(&sedona_types)? {
- self.sedona_impl.accumulator(&sedona_types, &output_type)
- } else {
- Err(DataFusionError::NotImplemented(
- "::kernel does not match input args::".to_string(),
- ))
- }
- }
-
- fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
- let sedona_types = args
- .input_fields
- .iter()
- .map(|f| SedonaType::from_storage_field(f))
- .collect::<Result<Vec<_>>>()?;
- self.sedona_impl.state_fields(&sedona_types)
- }
-}
-
-#[derive(Debug)]
-struct ImportedSedonaAccumulator {
- aggregate_impl: AggregateUDF,
-}
-
-impl TryFrom<&FFI_SedonaAggregateKernel> for ImportedSedonaAccumulator {
- type Error = DataFusionError;
-
- fn try_from(value: &FFI_SedonaAggregateKernel) -> Result<Self> {
- let wrapped = ForeignAggregateUDF::try_from(&value.inner)?;
- Ok(Self {
- aggregate_impl: wrapped.into(),
- })
- }
-}
-
-impl SedonaAccumulator for ImportedSedonaAccumulator {
- fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
- let arg_fields = args
- .iter()
- .map(|arg| arg.to_storage_field("", true).map(Arc::new))
- .collect::<Result<Vec<_>>>()?;
-
- match self.aggregate_impl.return_field(&arg_fields) {
- Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)),
- Err(err) => {
- if matches!(err, DataFusionError::NotImplemented(_)) {
- Ok(None)
- } else {
- Err(err)
- }
- }
- }
- }
-
- fn accumulator(
- &self,
- args: &[SedonaType],
- output_type: &SedonaType,
- ) -> Result<Box<dyn Accumulator>> {
- let arg_fields = args
- .iter()
- .map(|arg| arg.to_storage_field("", true).map(Arc::new))
- .collect::<Result<Vec<_>>>()?;
- let mock_schema = Schema::new(arg_fields);
- let exprs = (0..mock_schema.fields().len())
- .map(|i| -> Arc<dyn PhysicalExpr> { Arc::new(Column::new("col",
i)) })
- .collect::<Vec<_>>();
-
- let return_field = output_type.to_storage_field("", true)?;
-
- let args = AccumulatorArgs {
- return_field: return_field.into(),
- schema: &mock_schema,
- ignore_nulls: true,
- order_bys: &[],
- is_reversed: false,
- name: "",
- is_distinct: false,
- exprs: &exprs,
- };
-
- self.aggregate_impl.accumulator(args)
- }
-
- fn state_fields(&self, args: &[SedonaType]) -> Result<Vec<FieldRef>> {
- let arg_fields = args
- .iter()
- .map(|arg| arg.to_storage_field("", true).map(Arc::new))
- .collect::<Result<Vec<_>>>()?;
-
- let state_field_args = StateFieldsArgs {
- name: "",
- input_fields: &arg_fields,
- return_field: Arc::new(Field::new("", DataType::Null, false)),
- ordering_fields: &[],
- is_distinct: false,
- };
-
- self.aggregate_impl.state_fields(state_field_args)
- }
-}
-
-#[cfg(test)]
-mod test {
- use datafusion_expr::Volatility;
- use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
- use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher};
- use sedona_testing::{create::create_array, testers::ScalarUdfTester};
-
- use super::*;
-
- #[test]
- fn ffi_roundtrip() {
- let kernel = SimpleSedonaScalarKernel::new_ref(
- ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
- Arc::new(|_, args| Ok(args[0].clone())),
- );
-
- let array_value = create_array(&[Some("POINT (0 1)"), None],
&WKB_GEOMETRY);
-
- let udf_native = SedonaScalarUDF::new(
- "simple_udf",
- vec![kernel.clone()],
- Volatility::Immutable,
- None,
- );
-
- let tester = ScalarUdfTester::new(udf_native.into(),
vec![WKB_GEOMETRY]);
- tester.assert_return_type(WKB_GEOMETRY);
-
- let result = tester.invoke_scalar("POINT (0 1)").unwrap();
- tester.assert_scalar_result_equals(result, "POINT (0 1)");
-
- assert_eq!(
- &tester.invoke_array(array_value.clone()).unwrap(),
- &array_value
- );
-
- let ffi_kernel = FFI_SedonaScalarKernel::from(kernel.clone());
- let udf_from_ffi = SedonaScalarUDF::new(
- "simple_udf_from_ffi",
- vec![ffi_kernel.try_into().unwrap()],
- Volatility::Immutable,
- None,
- );
-
- let ffi_tester = ScalarUdfTester::new(udf_from_ffi.into(),
vec![WKB_GEOMETRY]);
- ffi_tester.assert_return_type(WKB_GEOMETRY);
-
- let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap();
- ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)");
-
- assert_eq!(
- &ffi_tester.invoke_array(array_value.clone()).unwrap(),
- &array_value
- );
- }
-}
diff --git a/rust/sedona/src/lib.rs b/rust/sedona/src/lib.rs
index 52b54387..9d18064c 100644
--- a/rust/sedona/src/lib.rs
+++ b/rust/sedona/src/lib.rs
@@ -17,7 +17,6 @@
mod catalog;
pub mod context;
mod exec;
-pub mod ffi;
mod object_storage;
pub mod random_geometry_provider;
pub mod reader;