This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 4da8e4ca feat(c/sedona-extension): Move FFI scalar kernel definitions 
to c/sedona-extension and write them in C (#407)
4da8e4ca is described below

commit 4da8e4ca32430125f00a106b535c047cd075172f
Author: Dewey Dunnington <[email protected]>
AuthorDate: Tue Dec 9 11:25:41 2025 -0600

    feat(c/sedona-extension): Move FFI scalar kernel definitions to 
c/sedona-extension and write them in C (#407)
---
 Cargo.lock                                     |  15 +
 Cargo.toml                                     |   1 +
 c/sedona-extension/Cargo.toml                  |  39 ++
 c/sedona-extension/src/extension.rs            | 122 ++++
 {rust/sedona => c/sedona-extension}/src/lib.rs |  12 +-
 c/sedona-extension/src/scalar_kernel.rs        | 882 +++++++++++++++++++++++++
 c/sedona-extension/src/sedona_extension.h      | 210 ++++++
 rust/sedona/src/ffi.rs                         | 515 ---------------
 rust/sedona/src/lib.rs                         |   1 -
 9 files changed, 1272 insertions(+), 525 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 15bd3d41..0295231a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4906,6 +4906,21 @@ dependencies = [
  "serde_json",
 ]
 
+[[package]]
+name = "sedona-extension"
+version = "0.2.0"
+dependencies = [
+ "arrow-array",
+ "arrow-schema",
+ "datafusion-common",
+ "datafusion-expr",
+ "libc",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-schema",
+ "sedona-testing",
+]
+
 [[package]]
 name = "sedona-functions"
 version = "0.3.0"
diff --git a/Cargo.toml b/Cargo.toml
index e4372a79..9d5fb784 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -16,6 +16,7 @@
 # under the License.
 [workspace]
 members = [
+    "c/sedona-extension",
     "c/sedona-geoarrow-c",
     "c/sedona-geos",
     "c/sedona-libgpuspatial",
diff --git a/c/sedona-extension/Cargo.toml b/c/sedona-extension/Cargo.toml
new file mode 100644
index 00000000..e8f8c17c
--- /dev/null
+++ b/c/sedona-extension/Cargo.toml
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "sedona-extension"
+version.workspace = true
+authors.workspace = true
+license.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[dependencies]
+arrow-array = { workspace = true, features = ["ffi"]}
+arrow-schema = { workspace = true, features = ["ffi"]}
+datafusion-common = { workspace = true }
+datafusion-expr = { workspace = true }
+libc = "0.2.178"
+sedona-common = { workspace = true }
+sedona-expr = { workspace = true }
+sedona-schema = { workspace = true }
+sedona-testing = { path = "../../rust/sedona-testing" }
diff --git a/c/sedona-extension/src/extension.rs 
b/c/sedona-extension/src/extension.rs
new file mode 100644
index 00000000..7d957a57
--- /dev/null
+++ b/c/sedona-extension/src/extension.rs
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    ffi::c_int,
+    os::raw::{c_char, c_void},
+    ptr::null_mut,
+};
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+
+/// Raw FFI representation of the SedonaCScalarKernel
+///
+/// See the ImportedScalarKernel and ExportedScalarKernel for high-level
+/// APIs to import and export implementations using this struct.
+#[derive(Default)]
+#[repr(C)]
+pub struct SedonaCScalarKernel {
+    pub function_name:
+        Option<unsafe extern "C" fn(self_: *const SedonaCScalarKernel) -> 
*const c_char>,
+    pub new_impl: Option<
+        unsafe extern "C" fn(self_: *const SedonaCScalarKernel, out: *mut 
SedonaCScalarKernelImpl),
+    >,
+
+    pub release: Option<unsafe extern "C" fn(self_: *mut SedonaCScalarKernel)>,
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for SedonaCScalarKernel {}
+unsafe impl Sync for SedonaCScalarKernel {}
+
+impl Drop for SedonaCScalarKernel {
+    fn drop(&mut self) {
+        if let Some(releaser) = self.release {
+            unsafe { releaser(self) }
+            self.release = None;
+            self.private_data = null_mut();
+        }
+    }
+}
+
+/// Raw FFI representation of the SedonaCScalarKernelImpl
+#[derive(Default)]
+#[repr(C)]
+pub struct SedonaCScalarKernelImpl {
+    pub init: Option<
+        unsafe extern "C" fn(
+            self_: *mut SedonaCScalarKernelImpl,
+            arg_types: *const *const FFI_ArrowSchema,
+            scalar_args: *const *mut FFI_ArrowArray,
+            n_args: i64,
+            out: *mut FFI_ArrowSchema,
+        ) -> c_int,
+    >,
+
+    pub execute: Option<
+        unsafe extern "C" fn(
+            self_: *mut SedonaCScalarKernelImpl,
+            args: *const *mut FFI_ArrowArray,
+            n_args: i64,
+            n_rows: i64,
+            out: *mut FFI_ArrowArray,
+        ) -> c_int,
+    >,
+
+    pub get_last_error:
+        Option<unsafe extern "C" fn(self_: *mut SedonaCScalarKernelImpl) -> 
*const c_char>,
+
+    pub release: Option<unsafe extern "C" fn(self_: *mut 
SedonaCScalarKernelImpl)>,
+
+    pub private_data: *mut c_void,
+}
+
+impl Drop for SedonaCScalarKernelImpl {
+    fn drop(&mut self) {
+        if let Some(releaser) = self.release {
+            unsafe { releaser(self) }
+            self.release = None;
+            self.private_data = null_mut();
+        }
+    }
+}
+
+/// Check if a schema is valid
+///
+/// The [FFI_ArrowSchema] doesn't have the ability to check for a NULL release 
callback,
+/// so we provide a mechanism to do so here.
+pub fn ffi_arrow_schema_is_valid(schema: *const FFI_ArrowSchema) -> bool {
+    let schema_internal = schema as *const c_void as *const 
ArrowSchemaInternal;
+    if let Some(schema_ref) = unsafe { schema_internal.as_ref() } {
+        schema_ref.release.is_some()
+    } else {
+        false
+    }
+}
+
+#[repr(C)]
+struct ArrowSchemaInternal {
+    format: *const c_char,
+    name: *const c_char,
+    metadata: *const c_char,
+    flags: i64,
+    n_children: i64,
+    children: *mut *mut ArrowSchemaInternal,
+    dictionary: *mut ArrowSchemaInternal,
+    release: Option<unsafe extern "C" fn(*mut ArrowSchemaInternal)>,
+    private_data: *mut c_void,
+}
diff --git a/rust/sedona/src/lib.rs b/c/sedona-extension/src/lib.rs
similarity index 82%
copy from rust/sedona/src/lib.rs
copy to c/sedona-extension/src/lib.rs
index 52b54387..adc0a921 100644
--- a/rust/sedona/src/lib.rs
+++ b/c/sedona-extension/src/lib.rs
@@ -14,12 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-mod catalog;
-pub mod context;
-mod exec;
-pub mod ffi;
-mod object_storage;
-pub mod random_geometry_provider;
-pub mod reader;
-pub mod record_batch_reader_provider;
-pub mod show;
+
+pub(crate) mod extension;
+pub mod scalar_kernel;
diff --git a/c/sedona-extension/src/scalar_kernel.rs 
b/c/sedona-extension/src/scalar_kernel.rs
new file mode 100644
index 00000000..17972356
--- /dev/null
+++ b/c/sedona-extension/src/scalar_kernel.rs
@@ -0,0 +1,882 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::{
+    ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema},
+    make_array, ArrayRef,
+};
+use arrow_schema::{ArrowError, Field};
+use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+use sedona_common::sedona_internal_err;
+use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel};
+use sedona_schema::datatypes::SedonaType;
+use std::{
+    ffi::{c_char, c_int, c_void, CStr, CString},
+    fmt::Debug,
+    iter::zip,
+    ptr::{null, null_mut, swap_nonoverlapping},
+    str::FromStr,
+};
+
+use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel, 
SedonaCScalarKernelImpl};
+
+/// Wrapper around a [SedonaCScalarKernel] that implements [SedonaScalarKernel]
+///
+/// This is the means by which a kernel implementation may be imported from a
+/// C implementation.
+pub struct ImportedScalarKernel {
+    inner: SedonaCScalarKernel,
+    function_name: Option<String>,
+}
+
+impl Debug for ImportedScalarKernel {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ImportedScalarKernel")
+            .field("inner", &"<SedonaCScalarKernel>")
+            .finish()
+    }
+}
+
+impl TryFrom<SedonaCScalarKernel> for ImportedScalarKernel {
+    type Error = DataFusionError;
+
+    fn try_from(value: SedonaCScalarKernel) -> Result<Self> {
+        match (&value.function_name, &value.new_impl, &value.release) {
+            (Some(function_name), Some(_), Some(_)) => {
+                let name_ptr = unsafe { function_name(&value) };
+                let name = if name_ptr.is_null() {
+                    None
+                } else {
+                    Some(
+                        unsafe { CStr::from_ptr(name_ptr) }
+                            .to_string_lossy()
+                            .into_owned(),
+                    )
+                };
+
+                Ok(Self {
+                    inner: value,
+                    function_name: name,
+                })
+            }
+            _ => sedona_internal_err!("Can't import released or uninitialized 
SedonaCScalarKernel"),
+        }
+    }
+}
+
+impl ImportedScalarKernel {
+    pub fn function_name(&self) -> Option<&str> {
+        self.function_name.as_deref()
+    }
+}
+
+impl SedonaScalarKernel for ImportedScalarKernel {
+    fn return_type_from_args_and_scalars(
+        &self,
+        args: &[SedonaType],
+        scalar_args: &[Option<&ScalarValue>],
+    ) -> Result<Option<SedonaType>> {
+        let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?;
+        inner_impl.init(args, scalar_args)
+    }
+
+    fn invoke_batch_from_args(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+        return_type: &SedonaType,
+        num_rows: usize,
+    ) -> Result<ColumnarValue> {
+        let arg_scalars = args
+            .iter()
+            .map(|arg| {
+                if let ColumnarValue::Scalar(scalar) = arg {
+                    Some(scalar)
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<_>>();
+
+        let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?;
+        inner_impl.init(arg_types, &arg_scalars)?;
+        let result_array = inner_impl.execute(args, return_type, num_rows)?;
+        for arg in args {
+            if let ColumnarValue::Array(_) = arg {
+                return Ok(ColumnarValue::Array(result_array));
+            }
+        }
+
+        if result_array.len() != 1 {
+            sedona_internal_err!(
+                "Expected scalar result but got result with length {}",
+                result_array.len()
+            )
+        } else {
+            Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
+                &result_array,
+                0,
+            )?))
+        }
+    }
+
+    fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        sedona_internal_err!(
+            "Should not be called because return_type_from_args_and_scalars() 
is implemented"
+        )
+    }
+
+    fn invoke_batch(
+        &self,
+        _arg_types: &[SedonaType],
+        _args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        sedona_internal_err!("Should not be called because 
invoke_batch_from_args() is implemented")
+    }
+}
+
+/// Wrapper class handling the verbose details of preparing and executing FFI 
calls
+/// for the [SedonaCScalarKernelImpl]
+struct CScalarKernelImplWrapper {
+    inner: SedonaCScalarKernelImpl,
+}
+
+impl CScalarKernelImplWrapper {
+    fn try_new(factory: &SedonaCScalarKernel) -> Result<Self> {
+        if let Some(init) = factory.new_impl {
+            let mut inner = SedonaCScalarKernelImpl::default();
+            unsafe { init(factory, &mut inner) };
+            Ok(Self { inner })
+        } else {
+            sedona_internal_err!("SedonaCScalarKernel is not valid")
+        }
+    }
+
+    fn init(
+        &mut self,
+        arg_types: &[SedonaType],
+        arg_scalars: &[Option<&ScalarValue>],
+    ) -> Result<Option<SedonaType>> {
+        if arg_types.len() != arg_scalars.len() {
+            return sedona_internal_err!("field/scalar lengths must be 
identical");
+        }
+
+        // Convert arg_types to Vec<Field>
+        let arg_fields = arg_types
+            .iter()
+            .map(|sedona_type| sedona_type.to_storage_field("", true))
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert arg types to Vec<FFI_ArrowSchema>
+        let ffi_fields = arg_fields
+            .iter()
+            .map(FFI_ArrowSchema::try_from)
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+
+        // Convert arg types to Vec<*const FFI_ArrowSchema>
+        let ffi_field_ptrs = ffi_fields
+            .iter()
+            .map(|ffi_field| ffi_field as *const FFI_ArrowSchema)
+            .collect::<Vec<_>>();
+
+        // Convert arg_scalars to Vec<Option<FFI_ArrowArray>>
+        let mut ffi_scalars = arg_scalars
+            .iter()
+            .map(|maybe_scalar| {
+                if let Some(scalar) = maybe_scalar {
+                    let array = scalar.to_array()?;
+                    Ok(Some(FFI_ArrowArray::new(&array.to_data())))
+                } else {
+                    Ok(None)
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert arg_scalars to Vec<*mut FFI_ArrowArray>
+        let ffi_scalar_ptrs = ffi_scalars
+            .iter_mut()
+            .map(|maybe_ffi_scalar| match maybe_ffi_scalar {
+                Some(ffi_scalar) => ffi_scalar as *mut FFI_ArrowArray,
+                None => null_mut(),
+            })
+            .collect::<Vec<_>>();
+
+        // Call the FFI implementation of init
+        if let Some(init) = self.inner.init {
+            let mut ffi_out = FFI_ArrowSchema::empty();
+            let code = unsafe {
+                init(
+                    &mut self.inner,
+                    ffi_field_ptrs.as_ptr(),
+                    ffi_scalar_ptrs.as_ptr(),
+                    arg_types.len() as i64,
+                    &mut ffi_out,
+                )
+            };
+
+            // On success, convert the output to SedonaType. If the 
implementation
+            // returned a "released" schema, this is the equivalent to our 
return_type()
+            // returning None (for "this kernel doesn't apply").
+            // On error, query the FFI implementation for the last error 
string.
+            if code == 0 {
+                if ffi_arrow_schema_is_valid(&ffi_out) {
+                    let field = Field::try_from(&ffi_out)?;
+                    Ok(Some(SedonaType::from_storage_field(&field)?))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                plan_err!(
+                    "SedonaCScalarKernelImpl::init failed: {}",
+                    self.last_error(code)
+                )
+            }
+        } else {
+            sedona_internal_err!("Invalid SedonaCScalarKernelImpl")
+        }
+    }
+
+    fn execute(
+        &mut self,
+        args: &[ColumnarValue],
+        return_type: &SedonaType,
+        num_rows: usize,
+    ) -> Result<ArrayRef> {
+        // Convert args to Vec<ArrayRef>
+        let arg_arrays = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Array(array) => Ok(array.clone()),
+                ColumnarValue::Scalar(scalar_value) => scalar_value.to_array(),
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert args to Vec<FFI_ArrowArray>
+        let mut ffi_args = arg_arrays
+            .iter()
+            .map(|arg| FFI_ArrowArray::new(&arg.to_data()))
+            .collect::<Vec<_>>();
+        let ffi_arg_ptrs = ffi_args
+            .iter_mut()
+            .map(|arg| arg as *mut FFI_ArrowArray)
+            .collect::<Vec<_>>();
+
+        // Call the FFI implementation of execute()
+        if let Some(execute) = self.inner.execute {
+            let mut ffi_out = FFI_ArrowArray::empty();
+            let code = unsafe {
+                execute(
+                    &mut self.inner,
+                    ffi_arg_ptrs.as_ptr(),
+                    args.len() as i64,
+                    num_rows as i64,
+                    &mut ffi_out,
+                )
+            };
+
+            // On success, convert the result to an ArrayRef.
+            // On error, query the FFI implementation for the last error 
string.
+            if code == 0 {
+                let data = unsafe {
+                    arrow_array::ffi::from_ffi_and_data_type(
+                        ffi_out,
+                        return_type.storage_type().clone(),
+                    )?
+                };
+                Ok(arrow_array::make_array(data))
+            } else {
+                plan_err!(
+                    "SedonaCScalarKernelImpl::execute failed: {}",
+                    self.last_error(code)
+                )
+            }
+        } else {
+            sedona_internal_err!("Invalid SedonaCScalarKernelImpl")
+        }
+    }
+
+    /// Helper to get the last error from the FFI implementation as a Rust 
String
+    fn last_error(&mut self, code: c_int) -> String {
+        if let Some(get_last_error) = self.inner.get_last_error {
+            let c_err = unsafe { get_last_error(&mut self.inner) };
+            if c_err.is_null() {
+                format!("({code})")
+            } else {
+                unsafe { CStr::from_ptr(c_err) }
+                    .to_string_lossy()
+                    .into_owned()
+            }
+        } else {
+            "Invalid SedonaCScalarKernelImpl".to_string()
+        }
+    }
+}
+
+/// Wrapper around a [ScalarKernelRef] that may be used to export an existing
+/// kernel across an FFI boundary using the [SedonaCScalarKernel]
+pub struct ExportedScalarKernel {
+    inner: ScalarKernelRef,
+    function_name: Option<CString>,
+}
+
+impl From<ScalarKernelRef> for ExportedScalarKernel {
+    fn from(value: ScalarKernelRef) -> Self {
+        ExportedScalarKernel {
+            inner: value,
+            function_name: None,
+        }
+    }
+}
+
+impl From<ExportedScalarKernel> for SedonaCScalarKernel {
+    fn from(value: ExportedScalarKernel) -> Self {
+        let box_value = Box::new(value);
+        Self {
+            function_name: Some(c_factory_function_name),
+            new_impl: Some(c_factory_new_impl),
+            release: Some(c_factory_release),
+            private_data: Box::leak(box_value) as *mut ExportedScalarKernel as 
*mut c_void,
+        }
+    }
+}
+
+impl ExportedScalarKernel {
+    /// Add a function name to this exported kernel
+    ///
+    /// This ensures the kernel will be registered with the appropriate 
function
+    /// when passed across a boundary.
+    pub fn with_function_name(self, function_name: impl AsRef<str>) -> Self {
+        Self {
+            inner: self.inner,
+            function_name: 
Some(CString::from_str(function_name.as_ref()).unwrap()),
+        }
+    }
+
+    fn new_impl(&self) -> ExportedScalarKernelImpl {
+        ExportedScalarKernelImpl::new(self.inner.clone())
+    }
+}
+
+/// C callable wrapper to expose [ExportedScalarKernel::function_name]
+unsafe extern "C" fn c_factory_function_name(self_: *const 
SedonaCScalarKernel) -> *const c_char {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_ref().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let private_data = (self_ref.private_data as *mut ExportedScalarKernel)
+        .as_ref()
+        .unwrap();
+    if let Some(function_name) = &private_data.function_name {
+        function_name.as_ptr()
+    } else {
+        null()
+    }
+}
+
+/// C callable wrapper around [ExportedScalarKernel::new_impl]
+unsafe extern "C" fn c_factory_new_impl(
+    self_: *const SedonaCScalarKernel,
+    out: *mut SedonaCScalarKernelImpl,
+) {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_ref().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let private_data = (self_ref.private_data as *mut ExportedScalarKernel)
+        .as_ref()
+        .unwrap();
+    *out = SedonaCScalarKernelImpl::from(private_data.new_impl())
+}
+
+/// C Callable wrapper called when this value is dropped via FFI
+unsafe extern "C" fn c_factory_release(self_: *mut SedonaCScalarKernel) {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_mut().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let boxed = Box::from_raw(self_ref.private_data as *mut 
ExportedScalarKernel);
+    drop(boxed);
+
+    self_ref.private_data = null_mut();
+    self_ref.release = None;
+}
+
+/// Rust-backed implementation of [SedonaCScalarKernelImpl]
+struct ExportedScalarKernelImpl {
+    inner: ScalarKernelRef,
+    last_arg_types: Option<Vec<SedonaType>>,
+    last_return_type: Option<SedonaType>,
+    last_error: CString,
+}
+
+impl From<ExportedScalarKernelImpl> for SedonaCScalarKernelImpl {
+    fn from(value: ExportedScalarKernelImpl) -> Self {
+        let box_value = Box::new(value);
+        Self {
+            init: Some(c_kernel_init),
+            execute: Some(c_kernel_execute),
+            get_last_error: Some(c_kernel_last_error),
+            release: Some(c_kernel_release),
+            private_data: Box::leak(box_value) as *mut 
ExportedScalarKernelImpl as *mut c_void,
+        }
+    }
+}
+
+impl ExportedScalarKernelImpl {
+    fn new(kernel: ScalarKernelRef) -> Self {
+        Self {
+            inner: kernel,
+            last_arg_types: None,
+            last_return_type: None,
+            last_error: CString::default(),
+        }
+    }
+
+    fn init(
+        &mut self,
+        ffi_types: &[*const FFI_ArrowSchema],
+        ffi_scalar_args: &[*mut FFI_ArrowArray],
+    ) -> Result<Option<FFI_ArrowSchema>> {
+        // Convert the input types to Vec<Field>
+        let arg_fields = ffi_types
+            .iter()
+            .map(|ptr| {
+                if let Some(ffi_schema) = unsafe { ptr.as_ref() } {
+                    Field::try_from(ffi_schema)
+                } else {
+                    Err(ArrowError::CDataInterface(
+                        "FFI_ArrowSchema is NULL".to_string(),
+                    ))
+                }
+            })
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+
+        // Convert the input types to Vec<SedonaType>
+        let args = arg_fields
+            .iter()
+            .map(SedonaType::from_storage_field)
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert the scalar arguments to Vec<Option<ArrayRef>>
+        let arg_arrays = zip(ffi_scalar_args, &args)
+            .map(|(ptr, arg)| {
+                if ptr.is_null() {
+                    Ok(None)
+                } else {
+                    let owned_ffi_array = unsafe { 
FFI_ArrowArray::from_raw(*ptr) };
+                    let data = unsafe {
+                        from_ffi_and_data_type(owned_ffi_array, 
arg.storage_type().clone())?
+                    };
+                    Ok(Some(make_array(data)))
+                }
+            })
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+
+        // Convert the scalar arguments to Vec<Option<ScalarValue>>
+        let scalar_args = arg_arrays
+            .iter()
+            .map(|maybe_array| {
+                if let Some(array) = maybe_array {
+                    Ok(Some(ScalarValue::try_from_array(array, 0)?))
+                } else {
+                    Ok(None)
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert the scalar arguments to Vec<Option<&ScalarValue>>
+        let scalar_arg_refs = scalar_args
+            .iter()
+            .map(|arg| arg.as_ref())
+            .collect::<Vec<_>>();
+
+        // Call the implementation
+        let maybe_return_type = self
+            .inner
+            .return_type_from_args_and_scalars(&args, &scalar_arg_refs)?;
+
+        // Convert the result to FFI_ArrowSchema (if not None)
+        let return_ffi_schema = if let Some(return_type) = &maybe_return_type {
+            let return_field = return_type.to_storage_field("", true)?;
+            let return_ffi_schema = FFI_ArrowSchema::try_from(&return_field)?;
+            Some(return_ffi_schema)
+        } else {
+            None
+        };
+
+        // Save the argument types and return type for following calls to 
execute()
+        self.last_arg_types.replace(args);
+        self.last_return_type = maybe_return_type;
+
+        Ok(return_ffi_schema)
+    }
+
+    fn execute(&self, ffi_args: &[*mut FFI_ArrowArray], num_rows: i64) -> 
Result<FFI_ArrowArray> {
+        match (&self.last_arg_types, &self.last_return_type) {
+            (Some(arg_types), Some(return_type)) => {
+                // Resolve args as Vec<ArrayRef>
+                let arg_arrays = zip(ffi_args, arg_types)
+                    .map(|(ptr, arg)| {
+                        let owned_ffi_array = unsafe { 
FFI_ArrowArray::from_raw(*ptr) };
+                        let data = unsafe {
+                            from_ffi_and_data_type(owned_ffi_array, 
arg.storage_type().clone())?
+                        };
+                        Ok(make_array(data))
+                    })
+                    .collect::<Result<Vec<_>, ArrowError>>()?;
+
+                // Resolve args as Vec<ColumnarValue>
+                let args = arg_arrays
+                    .into_iter()
+                    .map(|array| {
+                        if array.len() as i64 == num_rows {
+                            Ok(ColumnarValue::Array(array))
+                        } else {
+                            
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
+                                &array, 0,
+                            )?))
+                        }
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+
+                // Call the implementation
+                let result_value = self.inner.invoke_batch_from_args(
+                    arg_types,
+                    &args,
+                    return_type,
+                    num_rows as usize,
+                )?;
+
+                // Convert the result to an ArrayRef
+                let result_array = match result_value {
+                    ColumnarValue::Array(array) => array,
+                    ColumnarValue::Scalar(scalar_value) => 
scalar_value.to_array()?,
+                };
+
+                // Convert the result to a FFI_ArrowArray
+                let result_ffi_array = 
FFI_ArrowArray::new(&result_array.to_data());
+                Ok(result_ffi_array)
+            }
+            _ => {
+                sedona_internal_err!("Call to ExportedScalarKernel::execute() 
before init()")
+            }
+        }
+    }
+}
+
+/// C callable wrapper around [ExportedScalarKernelImpl::init]
+unsafe extern "C" fn c_kernel_init(
+    self_: *mut SedonaCScalarKernelImpl,
+    arg_types: *const *const FFI_ArrowSchema,
+    scalar_args: *const *mut FFI_ArrowArray,
+    n_args: i64,
+    out: *mut FFI_ArrowSchema,
+) -> c_int {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_ref().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl)
+        .as_mut()
+        .unwrap();
+
+    let ffi_types = std::slice::from_raw_parts(arg_types, n_args as usize);
+    let ffi_scalar_args = std::slice::from_raw_parts(scalar_args, n_args as 
usize);
+
+    match private_data.init(ffi_types, ffi_scalar_args) {
+        Ok(Some(mut return_ffi_schema)) => {
+            swap_nonoverlapping(&mut return_ffi_schema as *mut _, out, 1);
+            0
+        }
+        Ok(None) => {
+            *out = FFI_ArrowSchema::empty();
+            0
+        }
+        Err(err) => {
+            private_data.last_error =
+                
CString::from_str(&err.message()).unwrap_or(CString::default());
+            libc::EINVAL
+        }
+    }
+}
+
+/// C callable wrapper around [ExportedScalarKernelImpl::execute]
+unsafe extern "C" fn c_kernel_execute(
+    self_: *mut SedonaCScalarKernelImpl,
+    args: *const *mut FFI_ArrowArray,
+    n_args: i64,
+    n_rows: i64,
+    out: *mut FFI_ArrowArray,
+) -> c_int {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_ref().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl)
+        .as_mut()
+        .unwrap();
+
+    let ffi_args = std::slice::from_raw_parts(args, n_args as usize);
+    match private_data.execute(ffi_args, n_rows) {
+        Ok(mut ffi_array) => {
+            swap_nonoverlapping(&mut ffi_array as *mut _, out, 1);
+            0
+        }
+        Err(err) => {
+            private_data.last_error =
+                
CString::from_str(&err.message()).unwrap_or(CString::default());
+            libc::EINVAL
+        }
+    }
+}
+
+/// C Callable wrapper to retrieve the last error string
+unsafe extern "C" fn c_kernel_last_error(self_: *mut SedonaCScalarKernelImpl) 
-> *const c_char {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_ref().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let private_data = (self_ref.private_data as *mut ExportedScalarKernelImpl)
+        .as_ref()
+        .unwrap();
+    private_data.last_error.as_ptr()
+}
+
+/// C Callable wrapper called when this value is dropped via FFI
+unsafe extern "C" fn c_kernel_release(self_: *mut SedonaCScalarKernelImpl) {
+    assert!(!self_.is_null());
+    let self_ref = self_.as_mut().unwrap();
+
+    assert!(!self_ref.private_data.is_null());
+    let boxed = Box::from_raw(self_ref.private_data as *mut 
ExportedScalarKernelImpl);
+    drop(boxed);
+
+    self_ref.private_data = null_mut();
+    self_ref.release = None;
+}
+
+#[cfg(test)]
+mod test {
+    use std::sync::Arc;
+
+    use arrow_schema::DataType;
+    use datafusion_common::exec_err;
+    use datafusion_expr::Volatility;
+    use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
+    use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher};
+    use sedona_testing::{create::create_array, testers::ScalarUdfTester};
+
+    use super::*;
+
+    #[test]
+    fn ffi_roundtrip() {
+        let kernel = SimpleSedonaScalarKernel::new_ref(
+            ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
+            Arc::new(|_, args| Ok(args[0].clone())),
+        );
+
+        let array_value = create_array(&[Some("POINT (0 1)"), None], 
&WKB_GEOMETRY);
+
+        let udf_native = SedonaScalarUDF::new(
+            "simple_udf",
+            vec![kernel.clone()],
+            Volatility::Immutable,
+            None,
+        );
+
+        let tester = ScalarUdfTester::new(udf_native.into(), 
vec![WKB_GEOMETRY]);
+        tester.assert_return_type(WKB_GEOMETRY);
+
+        let result = tester.invoke_scalar("POINT (0 1)").unwrap();
+        tester.assert_scalar_result_equals(result, "POINT (0 1)");
+
+        assert_eq!(
+            &tester.invoke_array(array_value.clone()).unwrap(),
+            &array_value
+        );
+
+        let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+        let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+        let imported_kernel = 
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+        let udf_from_ffi = SedonaScalarUDF::new(
+            "simple_udf_from_ffi",
+            vec![Arc::new(imported_kernel)],
+            Volatility::Immutable,
+            None,
+        );
+
+        let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), 
vec![WKB_GEOMETRY]);
+        ffi_tester.assert_return_type(WKB_GEOMETRY);
+
+        let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap();
+        ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)");
+
+        assert_eq!(
+            &ffi_tester.invoke_array(array_value.clone()).unwrap(),
+            &array_value
+        );
+
+        // Check the case of a kernel that does not apply to input arguments
+        let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), 
vec![]);
+        let err = ffi_tester.return_type().unwrap_err();
+        assert_eq!(
+            err.message(),
+            "simple_udf_from_ffi([]): No kernel matching arguments"
+        );
+    }
+
+    #[test]
+    fn named_kernel() {
+        let kernel = SimpleSedonaScalarKernel::new_ref(
+            ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
+            Arc::new(|_, args| Ok(args[0].clone())),
+        );
+
+        // Without intervening, we have a None name
+        let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+        let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+        let imported_kernel = 
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+        assert!(imported_kernel.function_name().is_none());
+
+        // If we set a function name, it should be roundtripped
+        let exported_kernel =
+            
ExportedScalarKernel::from(kernel.clone()).with_function_name("foofy");
+        let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+        let imported_kernel = 
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+        assert_eq!(imported_kernel.function_name(), Some("foofy"));
+    }
+
+    #[test]
+    fn invoke_batch_from_scalar() {
+        let kernel = Arc::new(ReturnTypeFromScalars {}) as ScalarKernelRef;
+        let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+        let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+        let imported_kernel = 
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+        let udf_from_ffi = SedonaScalarUDF::new(
+            "simple_udf_from_ffi",
+            vec![Arc::new(imported_kernel)],
+            Volatility::Immutable,
+            None,
+        );
+
+        let ffi_tester = ScalarUdfTester::new(
+            udf_from_ffi.clone().into(),
+            vec![SedonaType::Arrow(DataType::Utf8)],
+        );
+        let return_type = 
ffi_tester.return_type_with_scalar(Some("foofy")).unwrap();
+        assert_eq!(return_type, SedonaType::Arrow(DataType::Utf8));
+    }
+
+    #[derive(Debug)]
+    struct ReturnTypeFromScalars {}
+
+    impl SedonaScalarKernel for ReturnTypeFromScalars {
+        fn return_type_from_args_and_scalars(
+            &self,
+            _args: &[SedonaType],
+            scalar_args: &[Option<&ScalarValue>],
+        ) -> Result<Option<SedonaType>> {
+            if let Some(arg0) = scalar_args[0] {
+                Ok(Some(SedonaType::Arrow(arg0.data_type())))
+            } else {
+                Ok(None)
+            }
+        }
+
+        fn return_type(&self, _args: &[SedonaType]) -> 
Result<Option<SedonaType>> {
+            unreachable!()
+        }
+
+        fn invoke_batch(
+            &self,
+            _arg_types: &[SedonaType],
+            _args: &[ColumnarValue],
+        ) -> Result<ColumnarValue> {
+            unreachable!()
+        }
+    }
+
+    #[test]
+    fn erroring_invoke_batch() {
+        let kernel = SimpleSedonaScalarKernel::new_ref(
+            ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
+            Arc::new(|_, _args| exec_err!("this invoke_batch() always 
errors")),
+        );
+
+        let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+        let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+        let imported_kernel = 
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+        let udf_from_ffi = SedonaScalarUDF::new(
+            "simple_udf_from_ffi",
+            vec![Arc::new(imported_kernel)],
+            Volatility::Immutable,
+            None,
+        );
+
+        let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), 
vec![WKB_GEOMETRY]);
+        ffi_tester.assert_return_type(WKB_GEOMETRY);
+
+        let err = ffi_tester.invoke_scalar("POINT (0 1)").unwrap_err();
+        assert_eq!(
+            err.message(),
+            "SedonaCScalarKernelImpl::execute failed: this invoke_batch() 
always errors"
+        );
+    }
+
+    #[test]
+    fn erroring_return_type() {
+        let kernel = Arc::new(ErroringReturnType {}) as ScalarKernelRef;
+        let exported_kernel = ExportedScalarKernel::from(kernel.clone());
+        let ffi_kernel = SedonaCScalarKernel::from(exported_kernel);
+        let imported_kernel = 
ImportedScalarKernel::try_from(ffi_kernel).unwrap();
+
+        let udf_from_ffi = SedonaScalarUDF::new(
+            "simple_udf_from_ffi",
+            vec![Arc::new(imported_kernel)],
+            Volatility::Immutable,
+            None,
+        );
+
+        let ffi_tester = ScalarUdfTester::new(udf_from_ffi.clone().into(), 
vec![WKB_GEOMETRY]);
+        let err = ffi_tester.return_type().unwrap_err();
+        assert_eq!(
+            err.message(),
+            "SedonaCScalarKernelImpl::init failed: this implementation of 
return_type always errors"
+        );
+    }
+
+    #[derive(Debug)]
+    struct ErroringReturnType {}
+
+    impl SedonaScalarKernel for ErroringReturnType {
+        fn return_type(&self, _args: &[SedonaType]) -> 
Result<Option<SedonaType>> {
+            plan_err!("this implementation of return_type always errors")
+        }
+
+        fn invoke_batch(
+            &self,
+            _arg_types: &[SedonaType],
+            _args: &[ColumnarValue],
+        ) -> Result<ColumnarValue> {
+            unreachable!()
+        }
+    }
+}
diff --git a/c/sedona-extension/src/sedona_extension.h 
b/c/sedona-extension/src/sedona_extension.h
new file mode 100644
index 00000000..7191af21
--- /dev/null
+++ b/c/sedona-extension/src/sedona_extension.h
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef SEDONA_EXTENSION_H
+#define SEDONA_EXTENSION_H
+
+#include <stdint.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+// Extra guard for versions of Arrow without the canonical guard
+#ifndef ARROW_FLAG_DICTIONARY_ORDERED
+
+#ifndef ARROW_C_DATA_INTERFACE
+#define ARROW_C_DATA_INTERFACE
+
+#define ARROW_FLAG_DICTIONARY_ORDERED 1
+#define ARROW_FLAG_NULLABLE 2
+#define ARROW_FLAG_MAP_KEYS_SORTED 4
+
+struct ArrowSchema {
+  // Array type description
+  const char* format;
+  const char* name;
+  const char* metadata;
+  int64_t flags;
+  int64_t n_children;
+  struct ArrowSchema** children;
+  struct ArrowSchema* dictionary;
+
+  // Release callback
+  void (*release)(struct ArrowSchema*);
+  // Opaque producer-specific data
+  void* private_data;
+};
+
+struct ArrowArray {
+  // Array data description
+  int64_t length;
+  int64_t null_count;
+  int64_t offset;
+  int64_t n_buffers;
+  int64_t n_children;
+  const void** buffers;
+  struct ArrowArray** children;
+  struct ArrowArray* dictionary;
+
+  // Release callback
+  void (*release)(struct ArrowArray*);
+  // Opaque producer-specific data
+  void* private_data;
+};
+
+#endif  // ARROW_C_DATA_INTERFACE
+
+#ifndef ARROW_C_STREAM_INTERFACE
+#define ARROW_C_STREAM_INTERFACE
+
+struct ArrowArrayStream {
+  // Callback to get the stream type
+  // (will be the same for all arrays in the stream).
+  //
+  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+  //
+  // If successful, the ArrowSchema must be released independently from the 
stream.
+  int (*get_schema)(struct ArrowArrayStream*, struct ArrowSchema* out);
+
+  // Callback to get the next array
+  // (if no error and the array is released, the stream has ended)
+  //
+  // Return value: 0 if successful, an `errno`-compatible error code otherwise.
+  //
+  // If successful, the ArrowArray must be released independently from the 
stream.
+  int (*get_next)(struct ArrowArrayStream*, struct ArrowArray* out);
+
+  // Callback to get optional detailed error information.
+  // This must only be called if the last stream operation failed
+  // with a non-0 return code.
+  //
+  // Return value: pointer to a null-terminated character array describing
+  // the last error, or NULL if no description is available.
+  //
+  // The returned pointer is only valid until the next operation on this stream
+  // (including release).
+  const char* (*get_last_error)(struct ArrowArrayStream*);
+
+  // Release callback: release the stream's own resources.
+  // Note that arrays returned by `get_next` must be individually released.
+  void (*release)(struct ArrowArrayStream*);
+
+  // Opaque producer-specific data
+  void* private_data;
+};
+
+#endif  // ARROW_C_STREAM_INTERFACE
+#endif  // ARROW_FLAG_DICTIONARY_ORDERED
+
+/// \brief Simple ABI-stable scalar function implementation
+///
+/// This object is not thread safe: callers must take care to serialize
+/// access to methods if an instance is shared across threads. In general,
+/// constructing and initializing this structure should be sufficiently
+/// cheap that it shouldn't need to be shared in this way.
+///
+/// Briefly, the SedonaCScalarKernelImpl is typically the stack-allocated
+/// structure that is not thread safe and the SedonaCScalarKernel is the
+/// value that lives in a registry (whose job it is to initialize 
implementations
+/// on each stack that needs one).
+struct SedonaCScalarKernelImpl {
+  /// \brief Initialize the state of this instance and calculate a return type
+  ///
+  /// The init callback either computes a return ArrowSchema or initializes the
+  /// return ArrowSchema to an explicitly released value to indicate that this
+  /// implementation does not apply to the arguments passed. An implementation
+  /// that does not apply to the arguments passed is not necessarily an error
+  /// (there may be another implementation prepared to handle such a case).
+  ///
+  /// \param arg_types Argument types
+  /// \param scalar_args An optional array of scalar arguments. The entire
+  /// array may be null to indicate that none of the arguments are scalars, or
+  /// individual items in the array may be NULL to indicate that a particular
+  /// argument is not a scalar. Any non-NULL arrays must be of length 1.
+  /// Implementations MAY take ownership over the elements of scalar_args but
+  /// are not required to do so (i.e., caller must check if these elements were
+  /// released, and must release them if needed).
+  /// \param n_args Number of elements in the arg_types and/or scalar_args 
arrays.
+  /// \param out Will be populated with the return type on success, or 
initialized
+  /// to a released value if this implementation does not apply to the 
arguments
+  /// passed.
+  ///
+  /// \return An errno-compatible error code, or zero on success.
+  int (*init)(struct SedonaCScalarKernelImpl* self,
+              const struct ArrowSchema* const* arg_types,
+              struct ArrowArray* const* scalar_args, int64_t n_args,
+              struct ArrowSchema* out);
+
+  /// \brief Execute a single batch
+  ///
+  /// \param args Input arguments. Input must be length one (e.g., a scalar)
+  /// or the size of the batch. Implementations must handle scalar or array
+  /// inputs.
+  /// \param n_args The number of pointers in args
+  /// \param out Will be populated with the result on success.
+  int (*execute)(struct SedonaCScalarKernelImpl* self, struct ArrowArray* 
const* args,
+                 int64_t n_args, int64_t n_rows, struct ArrowArray* out);
+
+  /// \brief Get the last error message
+  ///
+  /// The result is valid until the next call to a UDF method.
+  const char* (*get_last_error)(struct SedonaCScalarKernelImpl* self);
+
+  /// \brief Release this instance
+  ///
+  /// Implementations of this callback must set self->release to NULL.
+  void (*release)(struct SedonaCScalarKernelImpl* self);
+
+  /// \brief Opaque implementation-specific data
+  void* private_data;
+};
+
+/// \brief Scalar function/kernel initializer
+///
+/// Usually a SedonaCScalarKernelImpl will be used to execute a single batch
+/// (although it may be reused if a caller can serialize callback use). This
+/// structure is a factory object that initializes such objects. The
+/// SedonaCScalarKernel is designed to be thread-safe and live in a registry.
+struct SedonaCScalarKernel {
+  /// \brief Function name
+  ///
+  /// Optional function name. This is used to register the kernel with the
+  /// appropriate function when passing this kernel across a boundary.
+  const char* (*function_name)(const struct SedonaCScalarKernel* self);
+
+  /// \brief Initialize a new implementation struct
+  ///
+  /// This callback is thread safe and may be called concurrently from any
+  /// thread at any time (as long as this object is valid).
+  void (*new_impl)(const struct SedonaCScalarKernel* self,
+                   struct SedonaCScalarKernelImpl* out);
+
+  /// \brief Release this instance
+  ///
+  /// Implementations of this callback must set self->release to NULL.
+  void (*release)(struct SedonaCScalarKernel* self);
+
+  /// \brief Opaque implementation-specific data
+  void* private_data;
+};
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
diff --git a/rust/sedona/src/ffi.rs b/rust/sedona/src/ffi.rs
deleted file mode 100644
index e1cb1f7b..00000000
--- a/rust/sedona/src/ffi.rs
+++ /dev/null
@@ -1,515 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-use std::{any::Any, sync::Arc};
-
-use abi_stable::StableAbi;
-use arrow_schema::{DataType, Field, FieldRef, Schema};
-use datafusion::physical_plan::{expressions::Column, PhysicalExpr};
-use datafusion_common::{config::ConfigOptions, DataFusionError, Result, 
ScalarValue};
-use datafusion_expr::{
-    function::{AccumulatorArgs, StateFieldsArgs},
-    Accumulator, AggregateUDF, AggregateUDFImpl, ColumnarValue, 
ReturnFieldArgs,
-    ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
-};
-use datafusion_ffi::{
-    udaf::{FFI_AggregateUDF, ForeignAggregateUDF},
-    udf::{FFI_ScalarUDF, ForeignScalarUDF},
-};
-use sedona_common::sedona_internal_err;
-use sedona_schema::datatypes::SedonaType;
-
-use sedona_expr::{
-    aggregate_udf::{SedonaAccumulator, SedonaAccumulatorRef},
-    scalar_udf::{ScalarKernelRef, SedonaScalarKernel},
-};
-
-/// A stable struct for sharing [SedonaScalarKernel]s across FFI boundaries
-///
-/// The primary interface for importing or exporting these is `.from()`
-/// and `.into()` between the [FFI_SedonaScalarKernel] and the 
[ScalarKernelRef].
-///
-/// Internally this struct uses the [FFI_ScalarUDF] from DataFusion's FFI
-/// library to avoid having to invent an FFI ourselves. Like the 
[FFI_ScalarUDF],
-/// this struct is only convenient to use when the libraries on both sides of
-/// a boundary are written in Rust. Because Rust makes it relatively easy to
-/// wrap C or C++ libraries, this should not be a barrier for most types of
-/// kernels we might want to implement; however, it is also an option to
-/// create our own FFI using simpler primitives if using DataFusion's
-/// introduces performance or implementation issues.
-#[repr(C)]
-#[derive(Debug, StableAbi)]
-#[allow(non_camel_case_types)]
-pub struct FFI_SedonaScalarKernel {
-    inner: FFI_ScalarUDF,
-}
-
-impl From<ScalarKernelRef> for FFI_SedonaScalarKernel {
-    fn from(value: ScalarKernelRef) -> Self {
-        let exported = 
ScalarUDF::new_from_impl(ExportedScalarKernel::from(value));
-        FFI_SedonaScalarKernel {
-            inner: Arc::new(exported).into(),
-        }
-    }
-}
-
-impl TryFrom<&FFI_SedonaScalarKernel> for ScalarKernelRef {
-    type Error = DataFusionError;
-
-    fn try_from(value: &FFI_SedonaScalarKernel) -> Result<Self> {
-        Ok(Arc::new(ImportedScalarKernel::try_from(value)?))
-    }
-}
-
-impl TryFrom<FFI_SedonaScalarKernel> for ScalarKernelRef {
-    type Error = DataFusionError;
-
-    fn try_from(value: FFI_SedonaScalarKernel) -> Result<Self> {
-        Self::try_from(&value)
-    }
-}
-
-#[derive(Debug)]
-struct ExportedScalarKernel {
-    name: String,
-    signature: Signature,
-    sedona_impl: ScalarKernelRef,
-}
-
-impl PartialEq for ExportedScalarKernel {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name
-    }
-}
-
-impl Eq for ExportedScalarKernel {}
-
-impl std::hash::Hash for ExportedScalarKernel {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-    }
-}
-
-impl From<ScalarKernelRef> for ExportedScalarKernel {
-    fn from(value: ScalarKernelRef) -> Self {
-        Self {
-            name: "ExportedScalarKernel".to_string(),
-            signature: Signature::any(0, 
datafusion_expr::Volatility::Volatile),
-            sedona_impl: value,
-        }
-    }
-}
-
-impl ScalarUDFImpl for ExportedScalarKernel {
-    fn as_any(&self) -> &dyn std::any::Any {
-        self
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
-
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        sedona_internal_err!("should not be called")
-    }
-
-    fn return_field_from_args(&self, args: ReturnFieldArgs) -> 
Result<FieldRef> {
-        let sedona_types = args
-            .arg_fields
-            .iter()
-            .map(|f| SedonaType::from_storage_field(f))
-            .collect::<Result<Vec<_>>>()?;
-        match self.sedona_impl.return_type(&sedona_types)? {
-            Some(output_type) => Ok(output_type.to_storage_field("", 
true)?.into()),
-            // Sedona kernels return None to indicate the kernel doesn't apply 
to the inputs,
-            // but the ScalarUDFImpl doesn't have a way to natively indicate 
that. We use
-            // NotImplemented with a special message and catch it on the other 
side.
-            None => Err(DataFusionError::NotImplemented(
-                "::kernel does not match input args::".to_string(),
-            )),
-        }
-    }
-
-    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> 
Result<ColumnarValue> {
-        let sedona_types = args
-            .arg_fields
-            .iter()
-            .map(|f| SedonaType::from_storage_field(f))
-            .collect::<Result<Vec<_>>>()?;
-        self.sedona_impl.invoke_batch(&sedona_types, &args.args)
-    }
-}
-
-#[derive(Debug)]
-struct ImportedScalarKernel {
-    udf_impl: ScalarUDF,
-}
-
-impl TryFrom<&FFI_SedonaScalarKernel> for ImportedScalarKernel {
-    type Error = DataFusionError;
-
-    fn try_from(value: &FFI_SedonaScalarKernel) -> Result<Self> {
-        let wrapped = ForeignScalarUDF::try_from(&value.inner)?;
-        Ok(Self {
-            udf_impl: ScalarUDF::new_from_impl(wrapped),
-        })
-    }
-}
-
-impl SedonaScalarKernel for ImportedScalarKernel {
-    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
-        let df_args = ReturnFieldArgs {
-            arg_fields: &args
-                .iter()
-                .map(|arg| arg.to_storage_field("", true).map(Arc::new))
-                .collect::<Result<Vec<_>>>()?,
-            scalar_arguments: &[],
-        };
-        match self.udf_impl.return_field_from_args(df_args) {
-            Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)),
-            Err(err) => {
-                if matches!(err, DataFusionError::NotImplemented(_)) {
-                    Ok(None)
-                } else {
-                    Err(err)
-                }
-            }
-        }
-    }
-
-    fn invoke_batch(
-        &self,
-        arg_types: &[SedonaType],
-        args: &[ColumnarValue],
-    ) -> Result<ColumnarValue> {
-        let arg_rows = Self::output_size(args);
-
-        let scalar_fn_args = ScalarFunctionArgs {
-            args: args.to_vec(),
-            arg_fields: arg_types
-                .iter()
-                .map(|arg| arg.to_storage_field("", true).map(Arc::new))
-                .collect::<Result<Vec<_>>>()?,
-            number_rows: arg_rows.unwrap_or(1),
-            // Wrapper code on the other side of this doesn't use this value
-            return_field: Field::new("", DataType::Null, true).into(),
-            config_options: Arc::new(ConfigOptions::default()),
-        };
-
-        // DataFusion's FFI_ScalarUDF always returns array output but
-        // our original UDFs were careful to return ScalarValues.
-        match self.udf_impl.invoke_with_args(scalar_fn_args)? {
-            ColumnarValue::Array(array) => match arg_rows {
-                Some(_) => Ok(ColumnarValue::Array(array)),
-                None => Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
-                    &array, 0,
-                )?)),
-            },
-            ColumnarValue::Scalar(scalar_value) => {
-                // This branch is probably never taken but may in the future
-                Ok(ColumnarValue::Scalar(scalar_value))
-            }
-        }
-    }
-}
-
-impl ImportedScalarKernel {
-    fn output_size(args: &[ColumnarValue]) -> Option<usize> {
-        for original_arg in args {
-            if let ColumnarValue::Array(array) = original_arg {
-                return Some(array.len());
-            }
-        }
-        None
-    }
-}
-
-/// A stable struct for sharing [SedonaAccumulator]s across FFI boundaries
-///
-/// The primary interface for importing or exporting these is `.from()`
-/// and `.into()` between the [FFI_SedonaAggregateKernel] and the 
[SedonaAccumulatorRef].
-///
-/// Internally this struct uses the [FFI_AggregateUDF] from DataFusion's FFI
-/// library to avoid having to invent an FFI ourselves. See 
[FFI_SedonaScalarKernel]
-/// for general information about the rationale and usage of FFI 
implementations.
-#[repr(C)]
-#[derive(Debug, StableAbi)]
-#[allow(non_camel_case_types)]
-pub struct FFI_SedonaAggregateKernel {
-    inner: FFI_AggregateUDF,
-}
-
-impl From<SedonaAccumulatorRef> for FFI_SedonaAggregateKernel {
-    fn from(value: SedonaAccumulatorRef) -> Self {
-        let exported: AggregateUDF = 
ExportedSedonaAccumulator::from(value).into();
-        FFI_SedonaAggregateKernel {
-            inner: Arc::new(exported).into(),
-        }
-    }
-}
-
-impl TryFrom<&FFI_SedonaAggregateKernel> for SedonaAccumulatorRef {
-    type Error = DataFusionError;
-
-    fn try_from(value: &FFI_SedonaAggregateKernel) -> Result<Self> {
-        Ok(Arc::new(ImportedSedonaAccumulator::try_from(value)?))
-    }
-}
-
-impl TryFrom<FFI_SedonaAggregateKernel> for SedonaAccumulatorRef {
-    type Error = DataFusionError;
-
-    fn try_from(value: FFI_SedonaAggregateKernel) -> Result<Self> {
-        Self::try_from(&value)
-    }
-}
-
-#[derive(Debug)]
-struct ExportedSedonaAccumulator {
-    name: String,
-    signature: Signature,
-    sedona_impl: SedonaAccumulatorRef,
-}
-
-impl PartialEq for ExportedSedonaAccumulator {
-    fn eq(&self, other: &Self) -> bool {
-        self.name == other.name
-    }
-}
-
-impl Eq for ExportedSedonaAccumulator {}
-
-impl std::hash::Hash for ExportedSedonaAccumulator {
-    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
-        self.name.hash(state);
-    }
-}
-
-impl From<SedonaAccumulatorRef> for ExportedSedonaAccumulator {
-    fn from(value: SedonaAccumulatorRef) -> Self {
-        Self {
-            name: "ExportedSedonaAccumulator".to_string(),
-            signature: Signature::any(0, 
datafusion_expr::Volatility::Volatile),
-            sedona_impl: value,
-        }
-    }
-}
-
-impl AggregateUDFImpl for ExportedSedonaAccumulator {
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-
-    fn signature(&self) -> &Signature {
-        &self.signature
-    }
-
-    fn return_field(&self, arg_fields: &[FieldRef]) -> Result<FieldRef> {
-        let sedona_types = arg_fields
-            .iter()
-            .map(|f| SedonaType::from_storage_field(f))
-            .collect::<Result<Vec<_>>>()?;
-        match self.sedona_impl.return_type(&sedona_types)? {
-            Some(output_type) => Ok(Arc::new(output_type.to_storage_field("", 
true)?)),
-            // Sedona kernels return None to indicate the kernel doesn't apply 
to the inputs,
-            // but the ScalarUDFImpl doesn't have a way to natively indicate 
that. We use
-            // NotImplemented with a special message and catch it on the other 
side.
-            None => Err(DataFusionError::NotImplemented(
-                "::kernel does not match input args::".to_string(),
-            )),
-        }
-    }
-
-    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
-        sedona_internal_err!("This should not be called (use return_field())")
-    }
-
-    fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn 
Accumulator>> {
-        let arg_fields = acc_args
-            .exprs
-            .iter()
-            .map(|expr| expr.return_field(acc_args.schema))
-            .collect::<Result<Vec<_>>>()?;
-        let sedona_types = arg_fields
-            .iter()
-            .map(|f| SedonaType::from_storage_field(f))
-            .collect::<Result<Vec<_>>>()?;
-        if let Some(output_type) = 
self.sedona_impl.return_type(&sedona_types)? {
-            self.sedona_impl.accumulator(&sedona_types, &output_type)
-        } else {
-            Err(DataFusionError::NotImplemented(
-                "::kernel does not match input args::".to_string(),
-            ))
-        }
-    }
-
-    fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
-        let sedona_types = args
-            .input_fields
-            .iter()
-            .map(|f| SedonaType::from_storage_field(f))
-            .collect::<Result<Vec<_>>>()?;
-        self.sedona_impl.state_fields(&sedona_types)
-    }
-}
-
-#[derive(Debug)]
-struct ImportedSedonaAccumulator {
-    aggregate_impl: AggregateUDF,
-}
-
-impl TryFrom<&FFI_SedonaAggregateKernel> for ImportedSedonaAccumulator {
-    type Error = DataFusionError;
-
-    fn try_from(value: &FFI_SedonaAggregateKernel) -> Result<Self> {
-        let wrapped = ForeignAggregateUDF::try_from(&value.inner)?;
-        Ok(Self {
-            aggregate_impl: wrapped.into(),
-        })
-    }
-}
-
-impl SedonaAccumulator for ImportedSedonaAccumulator {
-    fn return_type(&self, args: &[SedonaType]) -> Result<Option<SedonaType>> {
-        let arg_fields = args
-            .iter()
-            .map(|arg| arg.to_storage_field("", true).map(Arc::new))
-            .collect::<Result<Vec<_>>>()?;
-
-        match self.aggregate_impl.return_field(&arg_fields) {
-            Ok(field) => Ok(Some(SedonaType::from_storage_field(&field)?)),
-            Err(err) => {
-                if matches!(err, DataFusionError::NotImplemented(_)) {
-                    Ok(None)
-                } else {
-                    Err(err)
-                }
-            }
-        }
-    }
-
-    fn accumulator(
-        &self,
-        args: &[SedonaType],
-        output_type: &SedonaType,
-    ) -> Result<Box<dyn Accumulator>> {
-        let arg_fields = args
-            .iter()
-            .map(|arg| arg.to_storage_field("", true).map(Arc::new))
-            .collect::<Result<Vec<_>>>()?;
-        let mock_schema = Schema::new(arg_fields);
-        let exprs = (0..mock_schema.fields().len())
-            .map(|i| -> Arc<dyn PhysicalExpr> { Arc::new(Column::new("col", 
i)) })
-            .collect::<Vec<_>>();
-
-        let return_field = output_type.to_storage_field("", true)?;
-
-        let args = AccumulatorArgs {
-            return_field: return_field.into(),
-            schema: &mock_schema,
-            ignore_nulls: true,
-            order_bys: &[],
-            is_reversed: false,
-            name: "",
-            is_distinct: false,
-            exprs: &exprs,
-        };
-
-        self.aggregate_impl.accumulator(args)
-    }
-
-    fn state_fields(&self, args: &[SedonaType]) -> Result<Vec<FieldRef>> {
-        let arg_fields = args
-            .iter()
-            .map(|arg| arg.to_storage_field("", true).map(Arc::new))
-            .collect::<Result<Vec<_>>>()?;
-
-        let state_field_args = StateFieldsArgs {
-            name: "",
-            input_fields: &arg_fields,
-            return_field: Arc::new(Field::new("", DataType::Null, false)),
-            ordering_fields: &[],
-            is_distinct: false,
-        };
-
-        self.aggregate_impl.state_fields(state_field_args)
-    }
-}
-
-#[cfg(test)]
-mod test {
-    use datafusion_expr::Volatility;
-    use sedona_expr::scalar_udf::{SedonaScalarUDF, SimpleSedonaScalarKernel};
-    use sedona_schema::{datatypes::WKB_GEOMETRY, matchers::ArgMatcher};
-    use sedona_testing::{create::create_array, testers::ScalarUdfTester};
-
-    use super::*;
-
-    #[test]
-    fn ffi_roundtrip() {
-        let kernel = SimpleSedonaScalarKernel::new_ref(
-            ArgMatcher::new(vec![ArgMatcher::is_geometry()], WKB_GEOMETRY),
-            Arc::new(|_, args| Ok(args[0].clone())),
-        );
-
-        let array_value = create_array(&[Some("POINT (0 1)"), None], 
&WKB_GEOMETRY);
-
-        let udf_native = SedonaScalarUDF::new(
-            "simple_udf",
-            vec![kernel.clone()],
-            Volatility::Immutable,
-            None,
-        );
-
-        let tester = ScalarUdfTester::new(udf_native.into(), 
vec![WKB_GEOMETRY]);
-        tester.assert_return_type(WKB_GEOMETRY);
-
-        let result = tester.invoke_scalar("POINT (0 1)").unwrap();
-        tester.assert_scalar_result_equals(result, "POINT (0 1)");
-
-        assert_eq!(
-            &tester.invoke_array(array_value.clone()).unwrap(),
-            &array_value
-        );
-
-        let ffi_kernel = FFI_SedonaScalarKernel::from(kernel.clone());
-        let udf_from_ffi = SedonaScalarUDF::new(
-            "simple_udf_from_ffi",
-            vec![ffi_kernel.try_into().unwrap()],
-            Volatility::Immutable,
-            None,
-        );
-
-        let ffi_tester = ScalarUdfTester::new(udf_from_ffi.into(), 
vec![WKB_GEOMETRY]);
-        ffi_tester.assert_return_type(WKB_GEOMETRY);
-
-        let result = ffi_tester.invoke_scalar("POINT (0 1)").unwrap();
-        ffi_tester.assert_scalar_result_equals(result, "POINT (0 1)");
-
-        assert_eq!(
-            &ffi_tester.invoke_array(array_value.clone()).unwrap(),
-            &array_value
-        );
-    }
-}
diff --git a/rust/sedona/src/lib.rs b/rust/sedona/src/lib.rs
index 52b54387..9d18064c 100644
--- a/rust/sedona/src/lib.rs
+++ b/rust/sedona/src/lib.rs
@@ -17,7 +17,6 @@
 mod catalog;
 pub mod context;
 mod exec;
-pub mod ffi;
 mod object_storage;
 pub mod random_geometry_provider;
 pub mod reader;

Reply via email to