Copilot commented on code in PR #407:
URL: https://github.com/apache/sedona-db/pull/407#discussion_r2586874942


##########
c/sedona-extension/src/scalar_kernel.rs:
##########
@@ -0,0 +1,883 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::{
+    ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema},
+    make_array, ArrayRef,
+};
+use arrow_schema::{ArrowError, Field};
+use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::ColumnarValue;
+use sedona_common::sedona_internal_err;
+use sedona_expr::scalar_udf::{ScalarKernelRef, SedonaScalarKernel};
+use sedona_schema::datatypes::SedonaType;
+use std::{
+    ffi::{c_char, c_int, c_void, CStr, CString},
+    fmt::Debug,
+    iter::zip,
+    ptr::{null, null_mut, swap_nonoverlapping},
+    str::FromStr,
+};
+
+use crate::extension::{ffi_arrow_schema_is_valid, SedonaCScalarKernel, 
SedonaCScalarKernelImpl};
+
+/// Wrapper around a [SedonaCScalarKernel] that implements [SedonaScalarKernel]
+///
+/// This is the means by which a kernel implementation may be imported from a
+/// C implementation.
+pub struct ImportedScalarKernel {
+    inner: SedonaCScalarKernel,
+    function_name: Option<String>,
+}
+
+impl Debug for ImportedScalarKernel {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ImportedScalarKernel")
+            .field("inner", &"<SedonaCScalarKernel>")
+            .finish()
+    }
+}
+
+impl TryFrom<SedonaCScalarKernel> for ImportedScalarKernel {
+    type Error = DataFusionError;
+
+    fn try_from(value: SedonaCScalarKernel) -> Result<Self> {
+        match (
+            &value.function_name,
+            &value.new_impl,
+            &value.release,
+            value.private_data.is_null(),
+        ) {
+            (Some(function_name), Some(_), Some(_), false) => {
+                let name_ptr = unsafe { function_name(&value) };
+                let name = if name_ptr.is_null() {
+                    None
+                } else {
+                    Some(
+                        unsafe { CStr::from_ptr(name_ptr) }
+                            .to_string_lossy()
+                            .into_owned(),
+                    )
+                };
+
+                Ok(Self {
+                    inner: value,
+                    function_name: name,
+                })
+            }
+            _ => sedona_internal_err!("Can't import released or uninitialized 
SedonaCScalarKernel"),
+        }
+    }
+}
+
+impl ImportedScalarKernel {
+    pub fn function_name(&self) -> Option<&str> {
+        self.function_name.as_deref()
+    }
+}
+
+impl SedonaScalarKernel for ImportedScalarKernel {
+    fn return_type_from_args_and_scalars(
+        &self,
+        args: &[SedonaType],
+        scalar_args: &[Option<&ScalarValue>],
+    ) -> Result<Option<SedonaType>> {
+        let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?;
+        inner_impl.init(args, scalar_args)
+    }
+
+    fn invoke_batch_from_args(
+        &self,
+        arg_types: &[SedonaType],
+        args: &[ColumnarValue],
+        return_type: &SedonaType,
+        num_rows: usize,
+    ) -> Result<ColumnarValue> {
+        let arg_scalars = args
+            .iter()
+            .map(|arg| {
+                if let ColumnarValue::Scalar(scalar) = arg {
+                    Some(scalar)
+                } else {
+                    None
+                }
+            })
+            .collect::<Vec<_>>();
+
+        let mut inner_impl = CScalarKernelImplWrapper::try_new(&self.inner)?;
+        inner_impl.init(arg_types, &arg_scalars)?;
+        let result_array = inner_impl.execute(args, return_type, num_rows)?;
+        for arg in args {
+            if let ColumnarValue::Array(_) = arg {
+                return Ok(ColumnarValue::Array(result_array));
+            }
+        }
+
+        if result_array.len() != 1 {
+            sedona_internal_err!(
+                "Expected scalar result but got result with length {}",
+                result_array.len()
+            )
+        } else {
+            Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
+                &result_array,
+                0,
+            )?))
+        }
+    }
+
+    fn return_type(&self, _args: &[SedonaType]) -> Result<Option<SedonaType>> {
+        sedona_internal_err!(
+            "Should not be called because return_type_from_args_and_scalars() 
is implemented"
+        )
+    }
+
+    fn invoke_batch(
+        &self,
+        _arg_types: &[SedonaType],
+        _args: &[ColumnarValue],
+    ) -> Result<ColumnarValue> {
+        sedona_internal_err!("Should not be called because 
invoke_batch_from_args() is implemented")
+    }
+}
+
+/// Wrapper class handling the verbose details of preparing and executing FFI 
calls
+/// for the [SedonaCScalarKernelImpl]
+struct CScalarKernelImplWrapper {
+    inner: SedonaCScalarKernelImpl,
+}
+
+impl CScalarKernelImplWrapper {
+    fn try_new(factory: &SedonaCScalarKernel) -> Result<Self> {
+        if let Some(init) = factory.new_impl {
+            let mut inner = SedonaCScalarKernelImpl::default();
+            unsafe { init(factory, &mut inner) };
+            Ok(Self { inner })
+        } else {
+            sedona_internal_err!("SedonaCScalarKernel is not valid")
+        }
+    }
+
+    fn init(
+        &mut self,
+        arg_types: &[SedonaType],
+        arg_scalars: &[Option<&ScalarValue>],
+    ) -> Result<Option<SedonaType>> {
+        if arg_types.len() != arg_scalars.len() {
+            return sedona_internal_err!("field/scalar lengths must be 
identical");
+        }
+
+        // Convert arg_types to Vec<Field>
+        let arg_fields = arg_types
+            .iter()
+            .map(|sedona_type| sedona_type.to_storage_field("", true))
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert arg types to Vec<FFI_ArrowSchema>
+        let ffi_fields = arg_fields
+            .iter()
+            .map(FFI_ArrowSchema::try_from)
+            .collect::<Result<Vec<_>, ArrowError>>()?;
+
+        // Convert arg types to Vec<*const FFI_ArrowSchema>
+        let ffi_field_ptrs = ffi_fields
+            .iter()
+            .map(|ffi_field| ffi_field as *const FFI_ArrowSchema)
+            .collect::<Vec<_>>();
+
+        // Convert arg_scalars to Vec<Option<FFI_ArrowArray>>
+        let mut ffi_scalars = arg_scalars
+            .iter()
+            .map(|maybe_scalar| {
+                if let Some(scalar) = maybe_scalar {
+                    let array = scalar.to_array()?;
+                    Ok(Some(FFI_ArrowArray::new(&array.to_data())))
+                } else {
+                    Ok(None)
+                }
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert arg_scalars to Vec<*mut FFI_ArrowArray>
+        let mut ffi_scalar_ptrs = ffi_scalars
+            .iter_mut()
+            .map(|maybe_ffi_scalar| match maybe_ffi_scalar {
+                Some(ffi_scalar) => ffi_scalar as *mut FFI_ArrowArray,
+                None => null_mut(),
+            })
+            .collect::<Vec<_>>();
+
+        // Call the FFI implementation of init
+        if let Some(init) = self.inner.init {
+            let mut ffi_out = FFI_ArrowSchema::empty();
+            let code = unsafe {
+                init(
+                    &mut self.inner,
+                    ffi_field_ptrs.as_ptr(),
+                    ffi_scalar_ptrs.as_mut_ptr(),
+                    arg_types.len() as i64,
+                    &mut ffi_out,
+                )
+            };
+
+            // On success, convert the output to SedonaType. If the 
implementation
+            // returned a "released" schema, this is the equivalent to our 
return_type()
+            // returning None (for "this kernel doesn't apply").
+            // On error, query the FFI implementation for the last error 
string.
+            if code == 0 {
+                if ffi_arrow_schema_is_valid(&ffi_out) {
+                    let field = Field::try_from(&ffi_out)?;
+                    Ok(Some(SedonaType::from_storage_field(&field)?))
+                } else {
+                    Ok(None)
+                }
+            } else {
+                plan_err!(
+                    "SedonaCScalarKernelImpl::init failed: {}",
+                    self.last_error(code)
+                )
+            }
+        } else {
+            sedona_internal_err!("Invalid SedonaCScalarKernelImpl")
+        }
+    }
+
+    fn execute(
+        &mut self,
+        args: &[ColumnarValue],
+        return_type: &SedonaType,
+        num_rows: usize,
+    ) -> Result<ArrayRef> {
+        // Convert args to Vec<ArrayRef>
+        let arg_arrays = args
+            .iter()
+            .map(|arg| match arg {
+                ColumnarValue::Array(array) => Ok(array.clone()),
+                ColumnarValue::Scalar(scalar_value) => scalar_value.to_array(),
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        // Convert args to Vec<FFI_ArrowArray>
+        let mut ffi_args = arg_arrays
+            .iter()
+            .map(|arg| FFI_ArrowArray::new(&arg.to_data()))
+            .collect::<Vec<_>>();
+
+        // Call the FFI implementation of execute()
+        if let Some(execute) = self.inner.execute {
+            let mut ffi_out = FFI_ArrowArray::empty();
+            let code = unsafe {
+                execute(
+                    &mut self.inner,
+                    &mut ffi_args.as_mut_ptr(),
+                    args.len() as i64,
+                    num_rows as i64,
+                    &mut ffi_out,
+                )
+            };
+
+            // On success, convert the result to an ArrayRef.
+            // On error, query the FFI implementation for the last error 
string.
+            if code == 0 {
+                let data = unsafe {
+                    arrow_array::ffi::from_ffi_and_data_type(
+                        ffi_out,
+                        return_type.storage_type().clone(),
+                    )?
+                };
+                Ok(arrow_array::make_array(data))
+            } else {
+                plan_err!(
+                    "SedonaCScalarKernelImpl::init failed: {}",

Review Comment:
   The error message says 'init failed' but this is in the execute() function. 
The message should say 'SedonaCScalarKernelImpl::execute failed'.
   ```suggestion
                       "SedonaCScalarKernelImpl::execute failed: {}",
   ```



##########
c/sedona-extension/src/extension.rs:
##########
@@ -0,0 +1,122 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    ffi::c_int,
+    os::raw::{c_char, c_void},
+    ptr::null_mut,
+};
+
+use arrow_array::ffi::{FFI_ArrowArray, FFI_ArrowSchema};
+
+/// Raw FFI representation of the SedonaCScalarKernel
+///
+/// See the ImportedScalarKernel and ExportedScalarKernel for high-level
+/// APIs to import and export implementations using this struct.
+#[derive(Default)]
+#[repr(C)]
+pub struct SedonaCScalarKernel {
+    pub function_name:
+        Option<unsafe extern "C" fn(self_: *const SedonaCScalarKernel) -> 
*const c_char>,
+    pub new_impl: Option<
+        unsafe extern "C" fn(self_: *const SedonaCScalarKernel, out: *mut 
SedonaCScalarKernelImpl),
+    >,
+
+    pub release: Option<unsafe extern "C" fn(self_: *mut SedonaCScalarKernel)>,
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for SedonaCScalarKernel {}
+unsafe impl Sync for SedonaCScalarKernel {}
+
+impl Drop for SedonaCScalarKernel {
+    fn drop(&mut self) {
+        if let Some(releaser) = self.release {
+            unsafe { releaser(self) }
+            self.release = None;
+            self.private_data = null_mut();
+        }
+    }
+}
+
+/// Raw FFI representation of the SedonaCScalarKernelImpl
+#[derive(Default)]
+#[repr(C)]
+pub struct SedonaCScalarKernelImpl {
+    pub init: Option<
+        unsafe extern "C" fn(
+            self_: *mut SedonaCScalarKernelImpl,
+            arg_types: *const *const FFI_ArrowSchema,
+            scalar_args: *mut *mut FFI_ArrowArray,
+            n_args: i64,
+            out: *mut FFI_ArrowSchema,
+        ) -> c_int,
+    >,
+
+    pub execute: Option<
+        unsafe extern "C" fn(
+            self_: *mut SedonaCScalarKernelImpl,
+            args: *mut *mut FFI_ArrowArray,
+            n_args: i64,
+            n_rows: i64,
+            out: *mut FFI_ArrowArray,
+        ) -> c_int,
+    >,
+
+    pub get_last_error:
+        Option<unsafe extern "C" fn(self_: *mut SedonaCScalarKernelImpl) -> 
*const c_char>,
+
+    pub release: Option<unsafe extern "C" fn(self_: *mut 
SedonaCScalarKernelImpl)>,
+
+    pub private_data: *mut c_void,
+}
+
+impl Drop for SedonaCScalarKernelImpl {
+    fn drop(&mut self) {
+        if let Some(releaser) = self.release {
+            unsafe { releaser(self) }
+            self.release = None;
+            self.private_data = null_mut();
+        }
+    }
+}
+
+/// Check if a schema is valid
+///
+/// The [FFI_ArrowSchema] doesn't have the ability to check for a NULL release 
callback,
+/// so we provide a mechanism to do so here.
+pub fn ffi_arrow_schema_is_valid(ffi_schema: *const FFI_ArrowSchema) -> bool {
+    let ffi_schema_internal = ffi_schema as *const c_void as *const 
ArrowSchemaInternal;
+    if let Some(schema_ref) = unsafe { ffi_schema_internal.as_ref() } {

Review Comment:
   [nitpick] The parameter name `ffi_schema` is inconsistent with the internal 
usage. Consider renaming to `schema` or updating the cast on line 103 to use 
the parameter name consistently.
   ```suggestion
   pub fn ffi_arrow_schema_is_valid(schema: *const FFI_ArrowSchema) -> bool {
       let schema_internal = schema as *const c_void as *const 
ArrowSchemaInternal;
       if let Some(schema_ref) = unsafe { schema_internal.as_ref() } {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to