This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a7f64a85e feat: Add Window UDFs to FFI Crate (#16261)
2a7f64a85e is described below

commit 2a7f64a85e3d98c51c106607a425d73d2b839e82
Author: Tim Saucer <timsau...@gmail.com>
AuthorDate: Thu Jun 5 16:42:56 2025 -0400

    feat: Add Window UDFs to FFI Crate (#16261)
    
    * Initial commit of UDWF via FFI
    
    * Work in progress on integration testing of udwf
    
    * Rebase due to UDF changes upstream
---
 datafusion/ffi/src/lib.rs                          |   1 +
 datafusion/ffi/src/tests/mod.rs                    |   9 +-
 datafusion/ffi/src/tests/udf_udaf_udwf.rs          |  20 +-
 datafusion/ffi/src/udwf/mod.rs                     | 366 +++++++++++++++++++++
 datafusion/ffi/src/udwf/partition_evaluator.rs     | 320 ++++++++++++++++++
 .../ffi/src/udwf/partition_evaluator_args.rs       | 174 ++++++++++
 datafusion/ffi/src/udwf/range.rs                   |  64 ++++
 datafusion/ffi/src/volatility.rs                   |   2 +-
 datafusion/ffi/tests/ffi_udwf.rs                   |  68 ++++
 9 files changed, 1019 insertions(+), 5 deletions(-)

diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 755c460f31..ff641e8315 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -37,6 +37,7 @@ pub mod table_source;
 pub mod udaf;
 pub mod udf;
 pub mod udtf;
+pub mod udwf;
 pub mod util;
 pub mod volatility;
 
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index f65ed7441b..db596f51fc 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -31,6 +31,8 @@ use crate::{catalog_provider::FFI_CatalogProvider, 
udtf::FFI_TableFunction};
 
 use crate::udaf::FFI_AggregateUDF;
 
+use crate::udwf::FFI_WindowUDF;
+
 use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
 use arrow::array::RecordBatch;
 use async_provider::create_async_table_provider;
@@ -40,8 +42,8 @@ use datafusion::{
 };
 use sync_provider::create_sync_table_provider;
 use udf_udaf_udwf::{
-    create_ffi_abs_func, create_ffi_random_func, create_ffi_stddev_func,
-    create_ffi_sum_func, create_ffi_table_func,
+    create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func,
+    create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
 };
 
 mod async_provider;
@@ -76,6 +78,8 @@ pub struct ForeignLibraryModule {
     /// Createa  grouping UDAF using stddev
     pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
 
+    pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
+
     pub version: extern "C" fn() -> u64,
 }
 
@@ -125,6 +129,7 @@ pub fn get_foreign_library_module() -> 
ForeignLibraryModuleRef {
         create_table_function: create_ffi_table_func,
         create_sum_udaf: create_ffi_sum_func,
         create_stddev_udaf: create_ffi_stddev_func,
+        create_rank_udwf: create_ffi_rank_func,
         version: super::version,
     }
     .leak_into_prefix()
diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs 
b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
index 6aa69bdd0c..55e31ef3ab 100644
--- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs
+++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs
@@ -15,13 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::{udaf::FFI_AggregateUDF, udf::FFI_ScalarUDF, 
udtf::FFI_TableFunction};
+use crate::{
+    udaf::FFI_AggregateUDF, udf::FFI_ScalarUDF, udtf::FFI_TableFunction,
+    udwf::FFI_WindowUDF,
+};
 use datafusion::{
     catalog::TableFunctionImpl,
     functions::math::{abs::AbsFunc, random::RandomFunc},
     functions_aggregate::{stddev::Stddev, sum::Sum},
     functions_table::generate_series::RangeFunc,
-    logical_expr::{AggregateUDF, ScalarUDF},
+    functions_window::rank::Rank,
+    logical_expr::{AggregateUDF, ScalarUDF, WindowUDF},
 };
 
 use std::sync::Arc;
@@ -55,3 +59,15 @@ pub(crate) extern "C" fn create_ffi_stddev_func() -> 
FFI_AggregateUDF {
 
     udaf.into()
 }
+
+pub(crate) extern "C" fn create_ffi_rank_func() -> FFI_WindowUDF {
+    let udwf: Arc<WindowUDF> = Arc::new(
+        Rank::new(
+            "rank_demo".to_string(),
+            datafusion::functions_window::rank::RankType::Basic,
+        )
+        .into(),
+    );
+
+    udwf.into()
+}
diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs
new file mode 100644
index 0000000000..aaa3f5c992
--- /dev/null
+++ b/datafusion/ffi/src/udwf/mod.rs
@@ -0,0 +1,366 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, sync::Arc};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::datatypes::Schema;
+use arrow::{
+    compute::SortOptions,
+    datatypes::{DataType, SchemaRef},
+};
+use arrow_schema::{Field, FieldRef};
+use datafusion::{
+    error::DataFusionError,
+    logical_expr::{
+        function::WindowUDFFieldArgs, 
type_coercion::functions::fields_with_window_udf,
+        PartitionEvaluator,
+    },
+};
+use datafusion::{
+    error::Result,
+    logical_expr::{Signature, WindowUDF, WindowUDFImpl},
+};
+use partition_evaluator::{FFI_PartitionEvaluator, ForeignPartitionEvaluator};
+use partition_evaluator_args::{
+    FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs,
+};
+mod partition_evaluator;
+mod partition_evaluator_args;
+mod range;
+
+use crate::util::{rvec_wrapped_to_vec_fieldref, vec_fieldref_to_rvec_wrapped};
+use crate::{
+    arrow_wrappers::WrappedSchema,
+    df_result, rresult, rresult_return,
+    util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped},
+    volatility::FFI_Volatility,
+};
+
+/// A stable struct for sharing a [`WindowUDF`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_WindowUDF {
+    /// FFI equivalent to the `name` of a [`WindowUDF`]
+    pub name: RString,
+
+    /// FFI equivalent to the `aliases` of a [`WindowUDF`]
+    pub aliases: RVec<RString>,
+
+    /// FFI equivalent to the `volatility` of a [`WindowUDF`]
+    pub volatility: FFI_Volatility,
+
+    pub partition_evaluator:
+        unsafe extern "C" fn(
+            udwf: &Self,
+            args: FFI_PartitionEvaluatorArgs,
+        ) -> RResult<FFI_PartitionEvaluator, RString>,
+
+    pub field: unsafe extern "C" fn(
+        udwf: &Self,
+        input_types: RVec<WrappedSchema>,
+        display_name: RString,
+    ) -> RResult<WrappedSchema, RString>,
+
+    /// Performs type coersion. To simply this interface, all UDFs are treated 
as having
+    /// user defined signatures, which will in turn call coerce_types to be 
called. This
+    /// call should be transparent to most users as the internal function 
performs the
+    /// appropriate calls on the underlying [`WindowUDF`]
+    pub coerce_types: unsafe extern "C" fn(
+        udf: &Self,
+        arg_types: RVec<WrappedSchema>,
+    ) -> RResult<RVec<WrappedSchema>, RString>,
+
+    pub sort_options: ROption<FFI_SortOptions>,
+
+    /// Used to create a clone on the provider of the udf. This should
+    /// only need to be called by the receiver of the udf.
+    pub clone: unsafe extern "C" fn(udf: &Self) -> Self,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(udf: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the udf.
+    /// A [`ForeignWindowUDF`] should never attempt to access this data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_WindowUDF {}
+unsafe impl Sync for FFI_WindowUDF {}
+
+pub struct WindowUDFPrivateData {
+    pub udf: Arc<WindowUDF>,
+}
+
+impl FFI_WindowUDF {
+    unsafe fn inner(&self) -> &Arc<WindowUDF> {
+        let private_data = self.private_data as *const WindowUDFPrivateData;
+        &(*private_data).udf
+    }
+}
+
+unsafe extern "C" fn partition_evaluator_fn_wrapper(
+    udwf: &FFI_WindowUDF,
+    args: FFI_PartitionEvaluatorArgs,
+) -> RResult<FFI_PartitionEvaluator, RString> {
+    let inner = udwf.inner();
+
+    let args = rresult_return!(ForeignPartitionEvaluatorArgs::try_from(args));
+
+    let evaluator = 
rresult_return!(inner.partition_evaluator_factory((&args).into()));
+
+    RResult::ROk(evaluator.into())
+}
+
+unsafe extern "C" fn field_fn_wrapper(
+    udwf: &FFI_WindowUDF,
+    input_fields: RVec<WrappedSchema>,
+    display_name: RString,
+) -> RResult<WrappedSchema, RString> {
+    let inner = udwf.inner();
+
+    let input_fields = 
rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields));
+
+    let field = rresult_return!(inner.field(WindowUDFFieldArgs::new(
+        &input_fields,
+        display_name.as_str()
+    )));
+
+    let schema = Arc::new(Schema::new(vec![field]));
+
+    RResult::ROk(WrappedSchema::from(schema))
+}
+
+unsafe extern "C" fn coerce_types_fn_wrapper(
+    udwf: &FFI_WindowUDF,
+    arg_types: RVec<WrappedSchema>,
+) -> RResult<RVec<WrappedSchema>, RString> {
+    let inner = udwf.inner();
+
+    let arg_fields = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types))
+        .into_iter()
+        .map(|dt| Field::new("f", dt, false))
+        .map(Arc::new)
+        .collect::<Vec<_>>();
+
+    let return_fields = rresult_return!(fields_with_window_udf(&arg_fields, 
inner));
+    let return_types = return_fields
+        .into_iter()
+        .map(|f| f.data_type().to_owned())
+        .collect::<Vec<_>>();
+
+    rresult!(vec_datatype_to_rvec_wrapped(&return_types))
+}
+
+unsafe extern "C" fn release_fn_wrapper(udwf: &mut FFI_WindowUDF) {
+    let private_data = Box::from_raw(udwf.private_data as *mut 
WindowUDFPrivateData);
+    drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF {
+    // let private_data = udf.private_data as *const WindowUDFPrivateData;
+    // let udf_data = &(*private_data);
+
+    // let private_data = Box::new(WindowUDFPrivateData {
+    //     udf: Arc::clone(&udf_data.udf),
+    // });
+    let private_data = Box::new(WindowUDFPrivateData {
+        udf: Arc::clone(udwf.inner()),
+    });
+
+    FFI_WindowUDF {
+        name: udwf.name.clone(),
+        aliases: udwf.aliases.clone(),
+        volatility: udwf.volatility.clone(),
+        partition_evaluator: partition_evaluator_fn_wrapper,
+        sort_options: udwf.sort_options.clone(),
+        coerce_types: coerce_types_fn_wrapper,
+        field: field_fn_wrapper,
+        clone: clone_fn_wrapper,
+        release: release_fn_wrapper,
+        private_data: Box::into_raw(private_data) as *mut c_void,
+    }
+}
+
+impl Clone for FFI_WindowUDF {
+    fn clone(&self) -> Self {
+        unsafe { (self.clone)(self) }
+    }
+}
+
+impl From<Arc<WindowUDF>> for FFI_WindowUDF {
+    fn from(udf: Arc<WindowUDF>) -> Self {
+        let name = udf.name().into();
+        let aliases = udf.aliases().iter().map(|a| 
a.to_owned().into()).collect();
+        let volatility = udf.signature().volatility.into();
+        let sort_options = udf.sort_options().map(|v| (&v).into()).into();
+
+        let private_data = Box::new(WindowUDFPrivateData { udf });
+
+        Self {
+            name,
+            aliases,
+            volatility,
+            partition_evaluator: partition_evaluator_fn_wrapper,
+            sort_options,
+            coerce_types: coerce_types_fn_wrapper,
+            field: field_fn_wrapper,
+            clone: clone_fn_wrapper,
+            release: release_fn_wrapper,
+            private_data: Box::into_raw(private_data) as *mut c_void,
+        }
+    }
+}
+
+impl Drop for FFI_WindowUDF {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+/// This struct is used to access an UDF provided by a foreign
+/// library across a FFI boundary.
+///
+/// The ForeignWindowUDF is to be used by the caller of the UDF, so it has
+/// no knowledge or access to the private data. All interaction with the UDF
+/// must occur through the functions defined in FFI_WindowUDF.
+#[derive(Debug)]
+pub struct ForeignWindowUDF {
+    name: String,
+    aliases: Vec<String>,
+    udf: FFI_WindowUDF,
+    signature: Signature,
+}
+
+unsafe impl Send for ForeignWindowUDF {}
+unsafe impl Sync for ForeignWindowUDF {}
+
+impl TryFrom<&FFI_WindowUDF> for ForeignWindowUDF {
+    type Error = DataFusionError;
+
+    fn try_from(udf: &FFI_WindowUDF) -> Result<Self, Self::Error> {
+        let name = udf.name.to_owned().into();
+        let signature = Signature::user_defined((&udf.volatility).into());
+
+        let aliases = udf.aliases.iter().map(|s| s.to_string()).collect();
+
+        Ok(Self {
+            name,
+            udf: udf.clone(),
+            aliases,
+            signature,
+        })
+    }
+}
+
+impl WindowUDFImpl for ForeignWindowUDF {
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+
+    fn signature(&self) -> &Signature {
+        &self.signature
+    }
+
+    fn aliases(&self) -> &[String] {
+        &self.aliases
+    }
+
+    fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
+        unsafe {
+            let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?;
+            let result_types = df_result!((self.udf.coerce_types)(&self.udf, 
arg_types))?;
+            Ok(rvec_wrapped_to_vec_datatype(&result_types)?)
+        }
+    }
+
+    fn partition_evaluator(
+        &self,
+        args: datafusion::logical_expr::function::PartitionEvaluatorArgs,
+    ) -> Result<Box<dyn PartitionEvaluator>> {
+        let evaluator = unsafe {
+            let args = FFI_PartitionEvaluatorArgs::try_from(args)?;
+            (self.udf.partition_evaluator)(&self.udf, args)
+        };
+
+        df_result!(evaluator).map(|evaluator| {
+            Box::new(ForeignPartitionEvaluator::from(evaluator))
+                as Box<dyn PartitionEvaluator>
+        })
+    }
+
+    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
+        unsafe {
+            let input_types = 
vec_fieldref_to_rvec_wrapped(field_args.input_fields())?;
+            let schema = df_result!((self.udf.field)(
+                &self.udf,
+                input_types,
+                field_args.name().into()
+            ))?;
+            let schema: SchemaRef = schema.into();
+
+            match schema.fields().is_empty() {
+                true => Err(DataFusionError::Execution(
+                    "Unable to retrieve field in WindowUDF via 
FFI".to_string(),
+                )),
+                false => Ok(schema.field(0).to_owned().into()),
+            }
+        }
+    }
+
+    fn sort_options(&self) -> Option<SortOptions> {
+        let options: Option<&FFI_SortOptions> = 
self.udf.sort_options.as_ref().into();
+        options.map(|s| s.into())
+    }
+}
+
+#[repr(C)]
+#[derive(Debug, StableAbi, Clone)]
+#[allow(non_camel_case_types)]
+pub struct FFI_SortOptions {
+    pub descending: bool,
+    pub nulls_first: bool,
+}
+
+impl From<&SortOptions> for FFI_SortOptions {
+    fn from(value: &SortOptions) -> Self {
+        Self {
+            descending: value.descending,
+            nulls_first: value.nulls_first,
+        }
+    }
+}
+
+impl From<&FFI_SortOptions> for SortOptions {
+    fn from(value: &FFI_SortOptions) -> Self {
+        Self {
+            descending: value.descending,
+            nulls_first: value.nulls_first,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {}
diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs 
b/datafusion/ffi/src/udwf/partition_evaluator.rs
new file mode 100644
index 0000000000..995d00cce3
--- /dev/null
+++ b/datafusion/ffi/src/udwf/partition_evaluator.rs
@@ -0,0 +1,320 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, ops::Range};
+
+use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return};
+use abi_stable::{
+    std_types::{RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{array::ArrayRef, error::ArrowError};
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::{window_state::WindowAggState, PartitionEvaluator},
+    scalar::ScalarValue,
+};
+use prost::Message;
+
+use super::range::FFI_Range;
+
+/// A stable struct for sharing [`PartitionEvaluator`] across FFI boundaries.
+/// For an explanation of each field, see the corresponding function
+/// defined in [`PartitionEvaluator`].
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_PartitionEvaluator {
+    pub evaluate_all: unsafe extern "C" fn(
+        evaluator: &mut Self,
+        values: RVec<WrappedArray>,
+        num_rows: usize,
+    ) -> RResult<WrappedArray, RString>,
+
+    pub evaluate: unsafe extern "C" fn(
+        evaluator: &mut Self,
+        values: RVec<WrappedArray>,
+        range: FFI_Range,
+    ) -> RResult<RVec<u8>, RString>,
+
+    pub evaluate_all_with_rank: unsafe extern "C" fn(
+        evaluator: &Self,
+        num_rows: usize,
+        ranks_in_partition: RVec<FFI_Range>,
+    )
+        -> RResult<WrappedArray, RString>,
+
+    pub get_range: unsafe extern "C" fn(
+        evaluator: &Self,
+        idx: usize,
+        n_rows: usize,
+    ) -> RResult<FFI_Range, RString>,
+
+    pub is_causal: bool,
+
+    pub supports_bounded_execution: bool,
+    pub uses_window_frame: bool,
+    pub include_rank: bool,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(evaluator: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the 
evaluator.
+    /// A [`ForeignPartitionEvaluator`] should never attempt to access this 
data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_PartitionEvaluator {}
+unsafe impl Sync for FFI_PartitionEvaluator {}
+
+pub struct PartitionEvaluatorPrivateData {
+    pub evaluator: Box<dyn PartitionEvaluator>,
+}
+
+impl FFI_PartitionEvaluator {
+    unsafe fn inner_mut(&mut self) -> &mut Box<(dyn PartitionEvaluator + 
'static)> {
+        let private_data = self.private_data as *mut 
PartitionEvaluatorPrivateData;
+        &mut (*private_data).evaluator
+    }
+
+    unsafe fn inner(&self) -> &(dyn PartitionEvaluator + 'static) {
+        let private_data = self.private_data as *mut 
PartitionEvaluatorPrivateData;
+        (*private_data).evaluator.as_ref()
+    }
+}
+
+unsafe extern "C" fn evaluate_all_fn_wrapper(
+    evaluator: &mut FFI_PartitionEvaluator,
+    values: RVec<WrappedArray>,
+    num_rows: usize,
+) -> RResult<WrappedArray, RString> {
+    let inner = evaluator.inner_mut();
+
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    let return_array = inner
+        .evaluate_all(&values_arrays, num_rows)
+        .and_then(|array| 
WrappedArray::try_from(&array).map_err(DataFusionError::from));
+
+    rresult!(return_array)
+}
+
+unsafe extern "C" fn evaluate_fn_wrapper(
+    evaluator: &mut FFI_PartitionEvaluator,
+    values: RVec<WrappedArray>,
+    range: FFI_Range,
+) -> RResult<RVec<u8>, RString> {
+    let inner = evaluator.inner_mut();
+
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    // let return_array = (inner.evaluate(&values_arrays, &range.into()));
+    // .and_then(|array| 
WrappedArray::try_from(&array).map_err(DataFusionError::from));
+    let scalar_result = rresult_return!(inner.evaluate(&values_arrays, 
&range.into()));
+    let proto_result: datafusion_proto::protobuf::ScalarValue =
+        rresult_return!((&scalar_result).try_into());
+
+    RResult::ROk(proto_result.encode_to_vec().into())
+}
+
+unsafe extern "C" fn evaluate_all_with_rank_fn_wrapper(
+    evaluator: &FFI_PartitionEvaluator,
+    num_rows: usize,
+    ranks_in_partition: RVec<FFI_Range>,
+) -> RResult<WrappedArray, RString> {
+    let inner = evaluator.inner();
+
+    let ranks_in_partition = ranks_in_partition
+        .into_iter()
+        .map(Range::from)
+        .collect::<Vec<_>>();
+
+    let return_array = inner
+        .evaluate_all_with_rank(num_rows, &ranks_in_partition)
+        .and_then(|array| 
WrappedArray::try_from(&array).map_err(DataFusionError::from));
+
+    rresult!(return_array)
+}
+
+unsafe extern "C" fn get_range_fn_wrapper(
+    evaluator: &FFI_PartitionEvaluator,
+    idx: usize,
+    n_rows: usize,
+) -> RResult<FFI_Range, RString> {
+    let inner = evaluator.inner();
+    let range = inner.get_range(idx, n_rows).map(FFI_Range::from);
+
+    rresult!(range)
+}
+
+unsafe extern "C" fn release_fn_wrapper(evaluator: &mut 
FFI_PartitionEvaluator) {
+    let private_data =
+        Box::from_raw(evaluator.private_data as *mut 
PartitionEvaluatorPrivateData);
+    drop(private_data);
+}
+
+impl From<Box<dyn PartitionEvaluator>> for FFI_PartitionEvaluator {
+    fn from(evaluator: Box<dyn PartitionEvaluator>) -> Self {
+        let is_causal = evaluator.is_causal();
+        let supports_bounded_execution = 
evaluator.supports_bounded_execution();
+        let include_rank = evaluator.include_rank();
+        let uses_window_frame = evaluator.uses_window_frame();
+
+        let private_data = PartitionEvaluatorPrivateData { evaluator };
+
+        Self {
+            evaluate: evaluate_fn_wrapper,
+            evaluate_all: evaluate_all_fn_wrapper,
+            evaluate_all_with_rank: evaluate_all_with_rank_fn_wrapper,
+            get_range: get_range_fn_wrapper,
+            is_causal,
+            supports_bounded_execution,
+            include_rank,
+            uses_window_frame,
+            release: release_fn_wrapper,
+            private_data: Box::into_raw(Box::new(private_data)) as *mut c_void,
+        }
+    }
+}
+
+impl Drop for FFI_PartitionEvaluator {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+/// This struct is used to access an UDF provided by a foreign
+/// library across a FFI boundary.
+///
+/// The ForeignPartitionEvaluator is to be used by the caller of the UDF, so 
it has
+/// no knowledge or access to the private data. All interaction with the UDF
+/// must occur through the functions defined in FFI_PartitionEvaluator.
+#[derive(Debug)]
+pub struct ForeignPartitionEvaluator {
+    evaluator: FFI_PartitionEvaluator,
+}
+
+unsafe impl Send for ForeignPartitionEvaluator {}
+unsafe impl Sync for ForeignPartitionEvaluator {}
+
+impl From<FFI_PartitionEvaluator> for ForeignPartitionEvaluator {
+    fn from(evaluator: FFI_PartitionEvaluator) -> Self {
+        Self { evaluator }
+    }
+}
+
+impl PartitionEvaluator for ForeignPartitionEvaluator {
+    fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> {
+        // Exposing `memoize` increases the surface are of the FFI work
+        // so for now we dot support it.
+        Ok(())
+    }
+
+    fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> {
+        let range = unsafe { (self.evaluator.get_range)(&self.evaluator, idx, 
n_rows) };
+        df_result!(range).map(Range::from)
+    }
+
+    /// Get whether evaluator needs future data for its result (if so returns 
`false`) or not
+    fn is_causal(&self) -> bool {
+        self.evaluator.is_causal
+    }
+
+    fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> 
Result<ArrayRef> {
+        let result = unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<RVec<_>, ArrowError>>()?;
+            (self.evaluator.evaluate_all)(&mut self.evaluator, values, 
num_rows)
+        };
+
+        let array = df_result!(result)?;
+
+        Ok(array.try_into()?)
+    }
+
+    fn evaluate(
+        &mut self,
+        values: &[ArrayRef],
+        range: &Range<usize>,
+    ) -> Result<ScalarValue> {
+        unsafe {
+            let values = values
+                .iter()
+                .map(WrappedArray::try_from)
+                .collect::<std::result::Result<RVec<_>, ArrowError>>()?;
+
+            let scalar_bytes = df_result!((self.evaluator.evaluate)(
+                &mut self.evaluator,
+                values,
+                range.to_owned().into()
+            ))?;
+
+            let proto_scalar =
+                
datafusion_proto::protobuf::ScalarValue::decode(scalar_bytes.as_ref())
+                    .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+            ScalarValue::try_from(&proto_scalar).map_err(DataFusionError::from)
+        }
+    }
+
+    fn evaluate_all_with_rank(
+        &self,
+        num_rows: usize,
+        ranks_in_partition: &[Range<usize>],
+    ) -> Result<ArrayRef> {
+        let result = unsafe {
+            let ranks_in_partition = ranks_in_partition
+                .iter()
+                .map(|rank| FFI_Range::from(rank.to_owned()))
+                .collect();
+            (self.evaluator.evaluate_all_with_rank)(
+                &self.evaluator,
+                num_rows,
+                ranks_in_partition,
+            )
+        };
+
+        let array = df_result!(result)?;
+
+        Ok(array.try_into()?)
+    }
+
+    fn supports_bounded_execution(&self) -> bool {
+        self.evaluator.supports_bounded_execution
+    }
+
+    fn uses_window_frame(&self) -> bool {
+        self.evaluator.uses_window_frame
+    }
+
+    fn include_rank(&self) -> bool {
+        self.evaluator.include_rank
+    }
+}
+
+#[cfg(test)]
+mod tests {}
diff --git a/datafusion/ffi/src/udwf/partition_evaluator_args.rs 
b/datafusion/ffi/src/udwf/partition_evaluator_args.rs
new file mode 100644
index 0000000000..e74d47aa1a
--- /dev/null
+++ b/datafusion/ffi/src/udwf/partition_evaluator_args.rs
@@ -0,0 +1,174 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{collections::HashMap, sync::Arc};
+
+use crate::arrow_wrappers::WrappedSchema;
+use abi_stable::{std_types::RVec, StableAbi};
+use arrow::{
+    datatypes::{DataType, Field, Schema, SchemaRef},
+    error::ArrowError,
+    ffi::FFI_ArrowSchema,
+};
+use arrow_schema::FieldRef;
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::function::PartitionEvaluatorArgs,
+    physical_plan::{expressions::Column, PhysicalExpr},
+    prelude::SessionContext,
+};
+use datafusion_proto::{
+    physical_plan::{
+        from_proto::parse_physical_expr, to_proto::serialize_physical_exprs,
+        DefaultPhysicalExtensionCodec,
+    },
+    protobuf::PhysicalExprNode,
+};
+use prost::Message;
+
+/// A stable struct for sharing [`PartitionEvaluatorArgs`] across FFI 
boundaries.
+/// For an explanation of each field, see the corresponding function
+/// defined in [`PartitionEvaluatorArgs`].
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_PartitionEvaluatorArgs {
+    input_exprs: RVec<RVec<u8>>,
+    input_fields: RVec<WrappedSchema>,
+    is_reversed: bool,
+    ignore_nulls: bool,
+    schema: WrappedSchema,
+}
+
+impl TryFrom<PartitionEvaluatorArgs<'_>> for FFI_PartitionEvaluatorArgs {
+    type Error = DataFusionError;
+    fn try_from(args: PartitionEvaluatorArgs) -> Result<Self, DataFusionError> 
{
+        // This is a bit of a hack. Since PartitionEvaluatorArgs does not 
carry a schema
+        // around, and instead passes the data types directly we are unable to 
decode the
+        // protobuf PhysicalExpr correctly. In evaluating the code the only 
place these
+        // appear to be really used are the Column data types. So here we will 
find all
+        // of the required columns and create a schema that has empty fields 
except for
+        // the ones we require. Ideally we would enhance 
PartitionEvaluatorArgs to just
+        // pass along the schema, but that is a larger breaking change.
+        let required_columns: HashMap<usize, (&str, &DataType)> = args
+            .input_exprs()
+            .iter()
+            .zip(args.input_fields())
+            .filter_map(|(expr, field)| {
+                expr.as_any()
+                    .downcast_ref::<Column>()
+                    .map(|column| (column.index(), (column.name(), 
field.data_type())))
+            })
+            .collect();
+
+        let max_column = 
required_columns.keys().max().unwrap_or(&0).to_owned();
+        let fields: Vec<_> = (0..max_column)
+            .map(|idx| match required_columns.get(&idx) {
+                Some((name, data_type)) => Field::new(*name, 
(*data_type).clone(), true),
+                None => Field::new(
+                    format!("ffi_partition_evaluator_col_{idx}"),
+                    DataType::Null,
+                    true,
+                ),
+            })
+            .collect();
+        let schema = Arc::new(Schema::new(fields));
+
+        let codec = DefaultPhysicalExtensionCodec {};
+        let input_exprs = serialize_physical_exprs(args.input_exprs(), &codec)?
+            .into_iter()
+            .map(|expr_node| expr_node.encode_to_vec().into())
+            .collect();
+
+        let input_fields = args
+            .input_fields()
+            .iter()
+            .map(|input_type| 
FFI_ArrowSchema::try_from(input_type).map(WrappedSchema))
+            .collect::<Result<Vec<_>, ArrowError>>()?
+            .into();
+
+        let schema: WrappedSchema = schema.into();
+
+        Ok(Self {
+            input_exprs,
+            input_fields,
+            schema,
+            is_reversed: args.is_reversed(),
+            ignore_nulls: args.ignore_nulls(),
+        })
+    }
+}
+
+/// This struct mirrors PartitionEvaluatorArgs except that it contains owned 
data.
+/// It is necessary to create this struct so that we can parse the protobuf
+/// data across the FFI boundary and turn it into owned data that
+/// PartitionEvaluatorArgs can then reference.
+pub struct ForeignPartitionEvaluatorArgs {
+    input_exprs: Vec<Arc<dyn PhysicalExpr>>,
+    input_fields: Vec<FieldRef>,
+    is_reversed: bool,
+    ignore_nulls: bool,
+}
+
+impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs {
+    type Error = DataFusionError;
+
+    fn try_from(value: FFI_PartitionEvaluatorArgs) -> Result<Self> {
+        let default_ctx = SessionContext::new();
+        let codec = DefaultPhysicalExtensionCodec {};
+
+        let schema: SchemaRef = value.schema.into();
+
+        let input_exprs = value
+            .input_exprs
+            .into_iter()
+            .map(|input_expr_bytes| 
PhysicalExprNode::decode(input_expr_bytes.as_ref()))
+            .collect::<std::result::Result<Vec<_>, prost::DecodeError>>()
+            .map_err(|e| DataFusionError::Execution(e.to_string()))?
+            .iter()
+            .map(|expr_node| {
+                parse_physical_expr(expr_node, &default_ctx, &schema, &codec)
+            })
+            .collect::<Result<Vec<_>>>()?;
+
+        let input_fields = input_exprs
+            .iter()
+            .map(|expr| expr.return_field(&schema))
+            .collect::<Result<Vec<_>>>()?;
+
+        Ok(Self {
+            input_exprs,
+            input_fields,
+            is_reversed: value.is_reversed,
+            ignore_nulls: value.ignore_nulls,
+        })
+    }
+}
+
+impl<'a> From<&'a ForeignPartitionEvaluatorArgs> for 
PartitionEvaluatorArgs<'a> {
+    fn from(value: &'a ForeignPartitionEvaluatorArgs) -> Self {
+        PartitionEvaluatorArgs::new(
+            &value.input_exprs,
+            &value.input_fields,
+            value.is_reversed,
+            value.ignore_nulls,
+        )
+    }
+}
+
+#[cfg(test)]
+mod tests {}
diff --git a/datafusion/ffi/src/udwf/range.rs b/datafusion/ffi/src/udwf/range.rs
new file mode 100644
index 0000000000..1ddcc4199f
--- /dev/null
+++ b/datafusion/ffi/src/udwf/range.rs
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ops::Range;
+
+use abi_stable::StableAbi;
+
+/// A stable struct for sharing [`Range`] across FFI boundaries.
+/// For an explanation of each field, see the corresponding function
+/// defined in [`Range`].
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_Range {
+    pub start: usize,
+    pub end: usize,
+}
+
+impl From<Range<usize>> for FFI_Range {
+    fn from(value: Range<usize>) -> Self {
+        Self {
+            start: value.start,
+            end: value.end,
+        }
+    }
+}
+
+impl From<FFI_Range> for Range<usize> {
+    fn from(value: FFI_Range) -> Self {
+        Self {
+            start: value.start,
+            end: value.end,
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_round_trip_ffi_range() {
+        let original = Range { start: 10, end: 30 };
+
+        let ffi_range: FFI_Range = original.clone().into();
+        let round_trip: Range<usize> = ffi_range.into();
+
+        assert_eq!(original, round_trip);
+    }
+}
diff --git a/datafusion/ffi/src/volatility.rs b/datafusion/ffi/src/volatility.rs
index 0aaf68a174..f1705da294 100644
--- a/datafusion/ffi/src/volatility.rs
+++ b/datafusion/ffi/src/volatility.rs
@@ -19,7 +19,7 @@ use abi_stable::StableAbi;
 use datafusion::logical_expr::Volatility;
 
 #[repr(C)]
-#[derive(Debug, StableAbi)]
+#[derive(Debug, StableAbi, Clone)]
 #[allow(non_camel_case_types)]
 pub enum FFI_Volatility {
     Immutable,
diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs
new file mode 100644
index 0000000000..db9ebba0fd
--- /dev/null
+++ b/datafusion/ffi/tests/ffi_udwf.rs
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Add an additional module here for convenience to scope this to only
+/// when the feature integtation-tests is built
+#[cfg(feature = "integration-tests")]
+mod tests {
+    use arrow::array::{create_array, ArrayRef};
+    use datafusion::error::{DataFusionError, Result};
+    use datafusion::logical_expr::expr::Sort;
+    use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF};
+    use datafusion::prelude::SessionContext;
+    use datafusion_ffi::tests::create_record_batch;
+    use datafusion_ffi::tests::utils::get_module;
+    use datafusion_ffi::udwf::ForeignWindowUDF;
+
+    #[tokio::test]
+    async fn test_rank_udwf() -> Result<()> {
+        let module = get_module()?;
+
+        let ffi_rank_func =
+            module
+                .create_rank_udwf()
+                .ok_or(DataFusionError::NotImplemented(
+                    "External table provider failed to implement 
create_scalar_udf"
+                        .to_string(),
+                ))?();
+        let foreign_rank_func: ForeignWindowUDF = (&ffi_rank_func).try_into()?;
+
+        let udwf: WindowUDF = foreign_rank_func.into();
+
+        let ctx = SessionContext::default();
+        let df = ctx.read_batch(create_record_batch(-5, 5))?;
+
+        let df = df.select(vec![
+            col("a"),
+            udwf.call(vec![])
+                .order_by(vec![Sort::new(col("a"), true, true)])
+                .build()
+                .unwrap()
+                .alias("rank_a"),
+        ])?;
+
+        df.clone().show().await?;
+
+        let result = df.collect().await?;
+        let expected = create_array!(UInt64, [1, 2, 3, 4, 5]) as ArrayRef;
+
+        assert_eq!(result.len(), 1);
+        assert_eq!(result[0].column(1), &expected);
+
+        Ok(())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to