This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 2a7f64a85e feat: Add Window UDFs to FFI Crate (#16261) 2a7f64a85e is described below commit 2a7f64a85e3d98c51c106607a425d73d2b839e82 Author: Tim Saucer <timsau...@gmail.com> AuthorDate: Thu Jun 5 16:42:56 2025 -0400 feat: Add Window UDFs to FFI Crate (#16261) * Initial commit of UDWF via FFI * Work in progress on integration testing of udwf * Rebase due to UDF changes upstream --- datafusion/ffi/src/lib.rs | 1 + datafusion/ffi/src/tests/mod.rs | 9 +- datafusion/ffi/src/tests/udf_udaf_udwf.rs | 20 +- datafusion/ffi/src/udwf/mod.rs | 366 +++++++++++++++++++++ datafusion/ffi/src/udwf/partition_evaluator.rs | 320 ++++++++++++++++++ .../ffi/src/udwf/partition_evaluator_args.rs | 174 ++++++++++ datafusion/ffi/src/udwf/range.rs | 64 ++++ datafusion/ffi/src/volatility.rs | 2 +- datafusion/ffi/tests/ffi_udwf.rs | 68 ++++ 9 files changed, 1019 insertions(+), 5 deletions(-) diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs index 755c460f31..ff641e8315 100644 --- a/datafusion/ffi/src/lib.rs +++ b/datafusion/ffi/src/lib.rs @@ -37,6 +37,7 @@ pub mod table_source; pub mod udaf; pub mod udf; pub mod udtf; +pub mod udwf; pub mod util; pub mod volatility; diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs index f65ed7441b..db596f51fc 100644 --- a/datafusion/ffi/src/tests/mod.rs +++ b/datafusion/ffi/src/tests/mod.rs @@ -31,6 +31,8 @@ use crate::{catalog_provider::FFI_CatalogProvider, udtf::FFI_TableFunction}; use crate::udaf::FFI_AggregateUDF; +use crate::udwf::FFI_WindowUDF; + use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF}; use arrow::array::RecordBatch; use async_provider::create_async_table_provider; @@ -40,8 +42,8 @@ use datafusion::{ }; use sync_provider::create_sync_table_provider; use udf_udaf_udwf::{ - create_ffi_abs_func, create_ffi_random_func, create_ffi_stddev_func, - create_ffi_sum_func, create_ffi_table_func, + create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func, + create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func, }; mod async_provider; @@ -76,6 +78,8 @@ pub struct ForeignLibraryModule { /// Createa grouping UDAF using stddev pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF, + pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF, + pub version: extern "C" fn() -> u64, } @@ -125,6 +129,7 @@ pub fn get_foreign_library_module() -> ForeignLibraryModuleRef { create_table_function: create_ffi_table_func, create_sum_udaf: create_ffi_sum_func, create_stddev_udaf: create_ffi_stddev_func, + create_rank_udwf: create_ffi_rank_func, version: super::version, } .leak_into_prefix() diff --git a/datafusion/ffi/src/tests/udf_udaf_udwf.rs b/datafusion/ffi/src/tests/udf_udaf_udwf.rs index 6aa69bdd0c..55e31ef3ab 100644 --- a/datafusion/ffi/src/tests/udf_udaf_udwf.rs +++ b/datafusion/ffi/src/tests/udf_udaf_udwf.rs @@ -15,13 +15,17 @@ // specific language governing permissions and limitations // under the License. -use crate::{udaf::FFI_AggregateUDF, udf::FFI_ScalarUDF, udtf::FFI_TableFunction}; +use crate::{ + udaf::FFI_AggregateUDF, udf::FFI_ScalarUDF, udtf::FFI_TableFunction, + udwf::FFI_WindowUDF, +}; use datafusion::{ catalog::TableFunctionImpl, functions::math::{abs::AbsFunc, random::RandomFunc}, functions_aggregate::{stddev::Stddev, sum::Sum}, functions_table::generate_series::RangeFunc, - logical_expr::{AggregateUDF, ScalarUDF}, + functions_window::rank::Rank, + logical_expr::{AggregateUDF, ScalarUDF, WindowUDF}, }; use std::sync::Arc; @@ -55,3 +59,15 @@ pub(crate) extern "C" fn create_ffi_stddev_func() -> FFI_AggregateUDF { udaf.into() } + +pub(crate) extern "C" fn create_ffi_rank_func() -> FFI_WindowUDF { + let udwf: Arc<WindowUDF> = Arc::new( + Rank::new( + "rank_demo".to_string(), + datafusion::functions_window::rank::RankType::Basic, + ) + .into(), + ); + + udwf.into() +} diff --git a/datafusion/ffi/src/udwf/mod.rs b/datafusion/ffi/src/udwf/mod.rs new file mode 100644 index 0000000000..aaa3f5c992 --- /dev/null +++ b/datafusion/ffi/src/udwf/mod.rs @@ -0,0 +1,366 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, sync::Arc}; + +use abi_stable::{ + std_types::{ROption, RResult, RString, RVec}, + StableAbi, +}; +use arrow::datatypes::Schema; +use arrow::{ + compute::SortOptions, + datatypes::{DataType, SchemaRef}, +}; +use arrow_schema::{Field, FieldRef}; +use datafusion::{ + error::DataFusionError, + logical_expr::{ + function::WindowUDFFieldArgs, type_coercion::functions::fields_with_window_udf, + PartitionEvaluator, + }, +}; +use datafusion::{ + error::Result, + logical_expr::{Signature, WindowUDF, WindowUDFImpl}, +}; +use partition_evaluator::{FFI_PartitionEvaluator, ForeignPartitionEvaluator}; +use partition_evaluator_args::{ + FFI_PartitionEvaluatorArgs, ForeignPartitionEvaluatorArgs, +}; +mod partition_evaluator; +mod partition_evaluator_args; +mod range; + +use crate::util::{rvec_wrapped_to_vec_fieldref, vec_fieldref_to_rvec_wrapped}; +use crate::{ + arrow_wrappers::WrappedSchema, + df_result, rresult, rresult_return, + util::{rvec_wrapped_to_vec_datatype, vec_datatype_to_rvec_wrapped}, + volatility::FFI_Volatility, +}; + +/// A stable struct for sharing a [`WindowUDF`] across FFI boundaries. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_WindowUDF { + /// FFI equivalent to the `name` of a [`WindowUDF`] + pub name: RString, + + /// FFI equivalent to the `aliases` of a [`WindowUDF`] + pub aliases: RVec<RString>, + + /// FFI equivalent to the `volatility` of a [`WindowUDF`] + pub volatility: FFI_Volatility, + + pub partition_evaluator: + unsafe extern "C" fn( + udwf: &Self, + args: FFI_PartitionEvaluatorArgs, + ) -> RResult<FFI_PartitionEvaluator, RString>, + + pub field: unsafe extern "C" fn( + udwf: &Self, + input_types: RVec<WrappedSchema>, + display_name: RString, + ) -> RResult<WrappedSchema, RString>, + + /// Performs type coersion. To simply this interface, all UDFs are treated as having + /// user defined signatures, which will in turn call coerce_types to be called. This + /// call should be transparent to most users as the internal function performs the + /// appropriate calls on the underlying [`WindowUDF`] + pub coerce_types: unsafe extern "C" fn( + udf: &Self, + arg_types: RVec<WrappedSchema>, + ) -> RResult<RVec<WrappedSchema>, RString>, + + pub sort_options: ROption<FFI_SortOptions>, + + /// Used to create a clone on the provider of the udf. This should + /// only need to be called by the receiver of the udf. + pub clone: unsafe extern "C" fn(udf: &Self) -> Self, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(udf: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the udf. + /// A [`ForeignWindowUDF`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_WindowUDF {} +unsafe impl Sync for FFI_WindowUDF {} + +pub struct WindowUDFPrivateData { + pub udf: Arc<WindowUDF>, +} + +impl FFI_WindowUDF { + unsafe fn inner(&self) -> &Arc<WindowUDF> { + let private_data = self.private_data as *const WindowUDFPrivateData; + &(*private_data).udf + } +} + +unsafe extern "C" fn partition_evaluator_fn_wrapper( + udwf: &FFI_WindowUDF, + args: FFI_PartitionEvaluatorArgs, +) -> RResult<FFI_PartitionEvaluator, RString> { + let inner = udwf.inner(); + + let args = rresult_return!(ForeignPartitionEvaluatorArgs::try_from(args)); + + let evaluator = rresult_return!(inner.partition_evaluator_factory((&args).into())); + + RResult::ROk(evaluator.into()) +} + +unsafe extern "C" fn field_fn_wrapper( + udwf: &FFI_WindowUDF, + input_fields: RVec<WrappedSchema>, + display_name: RString, +) -> RResult<WrappedSchema, RString> { + let inner = udwf.inner(); + + let input_fields = rresult_return!(rvec_wrapped_to_vec_fieldref(&input_fields)); + + let field = rresult_return!(inner.field(WindowUDFFieldArgs::new( + &input_fields, + display_name.as_str() + ))); + + let schema = Arc::new(Schema::new(vec![field])); + + RResult::ROk(WrappedSchema::from(schema)) +} + +unsafe extern "C" fn coerce_types_fn_wrapper( + udwf: &FFI_WindowUDF, + arg_types: RVec<WrappedSchema>, +) -> RResult<RVec<WrappedSchema>, RString> { + let inner = udwf.inner(); + + let arg_fields = rresult_return!(rvec_wrapped_to_vec_datatype(&arg_types)) + .into_iter() + .map(|dt| Field::new("f", dt, false)) + .map(Arc::new) + .collect::<Vec<_>>(); + + let return_fields = rresult_return!(fields_with_window_udf(&arg_fields, inner)); + let return_types = return_fields + .into_iter() + .map(|f| f.data_type().to_owned()) + .collect::<Vec<_>>(); + + rresult!(vec_datatype_to_rvec_wrapped(&return_types)) +} + +unsafe extern "C" fn release_fn_wrapper(udwf: &mut FFI_WindowUDF) { + let private_data = Box::from_raw(udwf.private_data as *mut WindowUDFPrivateData); + drop(private_data); +} + +unsafe extern "C" fn clone_fn_wrapper(udwf: &FFI_WindowUDF) -> FFI_WindowUDF { + // let private_data = udf.private_data as *const WindowUDFPrivateData; + // let udf_data = &(*private_data); + + // let private_data = Box::new(WindowUDFPrivateData { + // udf: Arc::clone(&udf_data.udf), + // }); + let private_data = Box::new(WindowUDFPrivateData { + udf: Arc::clone(udwf.inner()), + }); + + FFI_WindowUDF { + name: udwf.name.clone(), + aliases: udwf.aliases.clone(), + volatility: udwf.volatility.clone(), + partition_evaluator: partition_evaluator_fn_wrapper, + sort_options: udwf.sort_options.clone(), + coerce_types: coerce_types_fn_wrapper, + field: field_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + } +} + +impl Clone for FFI_WindowUDF { + fn clone(&self) -> Self { + unsafe { (self.clone)(self) } + } +} + +impl From<Arc<WindowUDF>> for FFI_WindowUDF { + fn from(udf: Arc<WindowUDF>) -> Self { + let name = udf.name().into(); + let aliases = udf.aliases().iter().map(|a| a.to_owned().into()).collect(); + let volatility = udf.signature().volatility.into(); + let sort_options = udf.sort_options().map(|v| (&v).into()).into(); + + let private_data = Box::new(WindowUDFPrivateData { udf }); + + Self { + name, + aliases, + volatility, + partition_evaluator: partition_evaluator_fn_wrapper, + sort_options, + coerce_types: coerce_types_fn_wrapper, + field: field_fn_wrapper, + clone: clone_fn_wrapper, + release: release_fn_wrapper, + private_data: Box::into_raw(private_data) as *mut c_void, + } + } +} + +impl Drop for FFI_WindowUDF { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +/// This struct is used to access an UDF provided by a foreign +/// library across a FFI boundary. +/// +/// The ForeignWindowUDF is to be used by the caller of the UDF, so it has +/// no knowledge or access to the private data. All interaction with the UDF +/// must occur through the functions defined in FFI_WindowUDF. +#[derive(Debug)] +pub struct ForeignWindowUDF { + name: String, + aliases: Vec<String>, + udf: FFI_WindowUDF, + signature: Signature, +} + +unsafe impl Send for ForeignWindowUDF {} +unsafe impl Sync for ForeignWindowUDF {} + +impl TryFrom<&FFI_WindowUDF> for ForeignWindowUDF { + type Error = DataFusionError; + + fn try_from(udf: &FFI_WindowUDF) -> Result<Self, Self::Error> { + let name = udf.name.to_owned().into(); + let signature = Signature::user_defined((&udf.volatility).into()); + + let aliases = udf.aliases.iter().map(|s| s.to_string()).collect(); + + Ok(Self { + name, + udf: udf.clone(), + aliases, + signature, + }) + } +} + +impl WindowUDFImpl for ForeignWindowUDF { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn name(&self) -> &str { + &self.name + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> { + unsafe { + let arg_types = vec_datatype_to_rvec_wrapped(arg_types)?; + let result_types = df_result!((self.udf.coerce_types)(&self.udf, arg_types))?; + Ok(rvec_wrapped_to_vec_datatype(&result_types)?) + } + } + + fn partition_evaluator( + &self, + args: datafusion::logical_expr::function::PartitionEvaluatorArgs, + ) -> Result<Box<dyn PartitionEvaluator>> { + let evaluator = unsafe { + let args = FFI_PartitionEvaluatorArgs::try_from(args)?; + (self.udf.partition_evaluator)(&self.udf, args) + }; + + df_result!(evaluator).map(|evaluator| { + Box::new(ForeignPartitionEvaluator::from(evaluator)) + as Box<dyn PartitionEvaluator> + }) + } + + fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> { + unsafe { + let input_types = vec_fieldref_to_rvec_wrapped(field_args.input_fields())?; + let schema = df_result!((self.udf.field)( + &self.udf, + input_types, + field_args.name().into() + ))?; + let schema: SchemaRef = schema.into(); + + match schema.fields().is_empty() { + true => Err(DataFusionError::Execution( + "Unable to retrieve field in WindowUDF via FFI".to_string(), + )), + false => Ok(schema.field(0).to_owned().into()), + } + } + } + + fn sort_options(&self) -> Option<SortOptions> { + let options: Option<&FFI_SortOptions> = self.udf.sort_options.as_ref().into(); + options.map(|s| s.into()) + } +} + +#[repr(C)] +#[derive(Debug, StableAbi, Clone)] +#[allow(non_camel_case_types)] +pub struct FFI_SortOptions { + pub descending: bool, + pub nulls_first: bool, +} + +impl From<&SortOptions> for FFI_SortOptions { + fn from(value: &SortOptions) -> Self { + Self { + descending: value.descending, + nulls_first: value.nulls_first, + } + } +} + +impl From<&FFI_SortOptions> for SortOptions { + fn from(value: &FFI_SortOptions) -> Self { + Self { + descending: value.descending, + nulls_first: value.nulls_first, + } + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/ffi/src/udwf/partition_evaluator.rs b/datafusion/ffi/src/udwf/partition_evaluator.rs new file mode 100644 index 0000000000..995d00cce3 --- /dev/null +++ b/datafusion/ffi/src/udwf/partition_evaluator.rs @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ffi::c_void, ops::Range}; + +use crate::{arrow_wrappers::WrappedArray, df_result, rresult, rresult_return}; +use abi_stable::{ + std_types::{RResult, RString, RVec}, + StableAbi, +}; +use arrow::{array::ArrayRef, error::ArrowError}; +use datafusion::{ + error::{DataFusionError, Result}, + logical_expr::{window_state::WindowAggState, PartitionEvaluator}, + scalar::ScalarValue, +}; +use prost::Message; + +use super::range::FFI_Range; + +/// A stable struct for sharing [`PartitionEvaluator`] across FFI boundaries. +/// For an explanation of each field, see the corresponding function +/// defined in [`PartitionEvaluator`]. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_PartitionEvaluator { + pub evaluate_all: unsafe extern "C" fn( + evaluator: &mut Self, + values: RVec<WrappedArray>, + num_rows: usize, + ) -> RResult<WrappedArray, RString>, + + pub evaluate: unsafe extern "C" fn( + evaluator: &mut Self, + values: RVec<WrappedArray>, + range: FFI_Range, + ) -> RResult<RVec<u8>, RString>, + + pub evaluate_all_with_rank: unsafe extern "C" fn( + evaluator: &Self, + num_rows: usize, + ranks_in_partition: RVec<FFI_Range>, + ) + -> RResult<WrappedArray, RString>, + + pub get_range: unsafe extern "C" fn( + evaluator: &Self, + idx: usize, + n_rows: usize, + ) -> RResult<FFI_Range, RString>, + + pub is_causal: bool, + + pub supports_bounded_execution: bool, + pub uses_window_frame: bool, + pub include_rank: bool, + + /// Release the memory of the private data when it is no longer being used. + pub release: unsafe extern "C" fn(evaluator: &mut Self), + + /// Internal data. This is only to be accessed by the provider of the evaluator. + /// A [`ForeignPartitionEvaluator`] should never attempt to access this data. + pub private_data: *mut c_void, +} + +unsafe impl Send for FFI_PartitionEvaluator {} +unsafe impl Sync for FFI_PartitionEvaluator {} + +pub struct PartitionEvaluatorPrivateData { + pub evaluator: Box<dyn PartitionEvaluator>, +} + +impl FFI_PartitionEvaluator { + unsafe fn inner_mut(&mut self) -> &mut Box<(dyn PartitionEvaluator + 'static)> { + let private_data = self.private_data as *mut PartitionEvaluatorPrivateData; + &mut (*private_data).evaluator + } + + unsafe fn inner(&self) -> &(dyn PartitionEvaluator + 'static) { + let private_data = self.private_data as *mut PartitionEvaluatorPrivateData; + (*private_data).evaluator.as_ref() + } +} + +unsafe extern "C" fn evaluate_all_fn_wrapper( + evaluator: &mut FFI_PartitionEvaluator, + values: RVec<WrappedArray>, + num_rows: usize, +) -> RResult<WrappedArray, RString> { + let inner = evaluator.inner_mut(); + + let values_arrays = values + .into_iter() + .map(|v| v.try_into().map_err(DataFusionError::from)) + .collect::<Result<Vec<ArrayRef>>>(); + let values_arrays = rresult_return!(values_arrays); + + let return_array = inner + .evaluate_all(&values_arrays, num_rows) + .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); + + rresult!(return_array) +} + +unsafe extern "C" fn evaluate_fn_wrapper( + evaluator: &mut FFI_PartitionEvaluator, + values: RVec<WrappedArray>, + range: FFI_Range, +) -> RResult<RVec<u8>, RString> { + let inner = evaluator.inner_mut(); + + let values_arrays = values + .into_iter() + .map(|v| v.try_into().map_err(DataFusionError::from)) + .collect::<Result<Vec<ArrayRef>>>(); + let values_arrays = rresult_return!(values_arrays); + + // let return_array = (inner.evaluate(&values_arrays, &range.into())); + // .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); + let scalar_result = rresult_return!(inner.evaluate(&values_arrays, &range.into())); + let proto_result: datafusion_proto::protobuf::ScalarValue = + rresult_return!((&scalar_result).try_into()); + + RResult::ROk(proto_result.encode_to_vec().into()) +} + +unsafe extern "C" fn evaluate_all_with_rank_fn_wrapper( + evaluator: &FFI_PartitionEvaluator, + num_rows: usize, + ranks_in_partition: RVec<FFI_Range>, +) -> RResult<WrappedArray, RString> { + let inner = evaluator.inner(); + + let ranks_in_partition = ranks_in_partition + .into_iter() + .map(Range::from) + .collect::<Vec<_>>(); + + let return_array = inner + .evaluate_all_with_rank(num_rows, &ranks_in_partition) + .and_then(|array| WrappedArray::try_from(&array).map_err(DataFusionError::from)); + + rresult!(return_array) +} + +unsafe extern "C" fn get_range_fn_wrapper( + evaluator: &FFI_PartitionEvaluator, + idx: usize, + n_rows: usize, +) -> RResult<FFI_Range, RString> { + let inner = evaluator.inner(); + let range = inner.get_range(idx, n_rows).map(FFI_Range::from); + + rresult!(range) +} + +unsafe extern "C" fn release_fn_wrapper(evaluator: &mut FFI_PartitionEvaluator) { + let private_data = + Box::from_raw(evaluator.private_data as *mut PartitionEvaluatorPrivateData); + drop(private_data); +} + +impl From<Box<dyn PartitionEvaluator>> for FFI_PartitionEvaluator { + fn from(evaluator: Box<dyn PartitionEvaluator>) -> Self { + let is_causal = evaluator.is_causal(); + let supports_bounded_execution = evaluator.supports_bounded_execution(); + let include_rank = evaluator.include_rank(); + let uses_window_frame = evaluator.uses_window_frame(); + + let private_data = PartitionEvaluatorPrivateData { evaluator }; + + Self { + evaluate: evaluate_fn_wrapper, + evaluate_all: evaluate_all_fn_wrapper, + evaluate_all_with_rank: evaluate_all_with_rank_fn_wrapper, + get_range: get_range_fn_wrapper, + is_causal, + supports_bounded_execution, + include_rank, + uses_window_frame, + release: release_fn_wrapper, + private_data: Box::into_raw(Box::new(private_data)) as *mut c_void, + } + } +} + +impl Drop for FFI_PartitionEvaluator { + fn drop(&mut self) { + unsafe { (self.release)(self) } + } +} + +/// This struct is used to access an UDF provided by a foreign +/// library across a FFI boundary. +/// +/// The ForeignPartitionEvaluator is to be used by the caller of the UDF, so it has +/// no knowledge or access to the private data. All interaction with the UDF +/// must occur through the functions defined in FFI_PartitionEvaluator. +#[derive(Debug)] +pub struct ForeignPartitionEvaluator { + evaluator: FFI_PartitionEvaluator, +} + +unsafe impl Send for ForeignPartitionEvaluator {} +unsafe impl Sync for ForeignPartitionEvaluator {} + +impl From<FFI_PartitionEvaluator> for ForeignPartitionEvaluator { + fn from(evaluator: FFI_PartitionEvaluator) -> Self { + Self { evaluator } + } +} + +impl PartitionEvaluator for ForeignPartitionEvaluator { + fn memoize(&mut self, _state: &mut WindowAggState) -> Result<()> { + // Exposing `memoize` increases the surface are of the FFI work + // so for now we dot support it. + Ok(()) + } + + fn get_range(&self, idx: usize, n_rows: usize) -> Result<Range<usize>> { + let range = unsafe { (self.evaluator.get_range)(&self.evaluator, idx, n_rows) }; + df_result!(range).map(Range::from) + } + + /// Get whether evaluator needs future data for its result (if so returns `false`) or not + fn is_causal(&self) -> bool { + self.evaluator.is_causal + } + + fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> { + let result = unsafe { + let values = values + .iter() + .map(WrappedArray::try_from) + .collect::<std::result::Result<RVec<_>, ArrowError>>()?; + (self.evaluator.evaluate_all)(&mut self.evaluator, values, num_rows) + }; + + let array = df_result!(result)?; + + Ok(array.try_into()?) + } + + fn evaluate( + &mut self, + values: &[ArrayRef], + range: &Range<usize>, + ) -> Result<ScalarValue> { + unsafe { + let values = values + .iter() + .map(WrappedArray::try_from) + .collect::<std::result::Result<RVec<_>, ArrowError>>()?; + + let scalar_bytes = df_result!((self.evaluator.evaluate)( + &mut self.evaluator, + values, + range.to_owned().into() + ))?; + + let proto_scalar = + datafusion_proto::protobuf::ScalarValue::decode(scalar_bytes.as_ref()) + .map_err(|e| DataFusionError::External(Box::new(e)))?; + + ScalarValue::try_from(&proto_scalar).map_err(DataFusionError::from) + } + } + + fn evaluate_all_with_rank( + &self, + num_rows: usize, + ranks_in_partition: &[Range<usize>], + ) -> Result<ArrayRef> { + let result = unsafe { + let ranks_in_partition = ranks_in_partition + .iter() + .map(|rank| FFI_Range::from(rank.to_owned())) + .collect(); + (self.evaluator.evaluate_all_with_rank)( + &self.evaluator, + num_rows, + ranks_in_partition, + ) + }; + + let array = df_result!(result)?; + + Ok(array.try_into()?) + } + + fn supports_bounded_execution(&self) -> bool { + self.evaluator.supports_bounded_execution + } + + fn uses_window_frame(&self) -> bool { + self.evaluator.uses_window_frame + } + + fn include_rank(&self) -> bool { + self.evaluator.include_rank + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/ffi/src/udwf/partition_evaluator_args.rs b/datafusion/ffi/src/udwf/partition_evaluator_args.rs new file mode 100644 index 0000000000..e74d47aa1a --- /dev/null +++ b/datafusion/ffi/src/udwf/partition_evaluator_args.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{collections::HashMap, sync::Arc}; + +use crate::arrow_wrappers::WrappedSchema; +use abi_stable::{std_types::RVec, StableAbi}; +use arrow::{ + datatypes::{DataType, Field, Schema, SchemaRef}, + error::ArrowError, + ffi::FFI_ArrowSchema, +}; +use arrow_schema::FieldRef; +use datafusion::{ + error::{DataFusionError, Result}, + logical_expr::function::PartitionEvaluatorArgs, + physical_plan::{expressions::Column, PhysicalExpr}, + prelude::SessionContext, +}; +use datafusion_proto::{ + physical_plan::{ + from_proto::parse_physical_expr, to_proto::serialize_physical_exprs, + DefaultPhysicalExtensionCodec, + }, + protobuf::PhysicalExprNode, +}; +use prost::Message; + +/// A stable struct for sharing [`PartitionEvaluatorArgs`] across FFI boundaries. +/// For an explanation of each field, see the corresponding function +/// defined in [`PartitionEvaluatorArgs`]. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_PartitionEvaluatorArgs { + input_exprs: RVec<RVec<u8>>, + input_fields: RVec<WrappedSchema>, + is_reversed: bool, + ignore_nulls: bool, + schema: WrappedSchema, +} + +impl TryFrom<PartitionEvaluatorArgs<'_>> for FFI_PartitionEvaluatorArgs { + type Error = DataFusionError; + fn try_from(args: PartitionEvaluatorArgs) -> Result<Self, DataFusionError> { + // This is a bit of a hack. Since PartitionEvaluatorArgs does not carry a schema + // around, and instead passes the data types directly we are unable to decode the + // protobuf PhysicalExpr correctly. In evaluating the code the only place these + // appear to be really used are the Column data types. So here we will find all + // of the required columns and create a schema that has empty fields except for + // the ones we require. Ideally we would enhance PartitionEvaluatorArgs to just + // pass along the schema, but that is a larger breaking change. + let required_columns: HashMap<usize, (&str, &DataType)> = args + .input_exprs() + .iter() + .zip(args.input_fields()) + .filter_map(|(expr, field)| { + expr.as_any() + .downcast_ref::<Column>() + .map(|column| (column.index(), (column.name(), field.data_type()))) + }) + .collect(); + + let max_column = required_columns.keys().max().unwrap_or(&0).to_owned(); + let fields: Vec<_> = (0..max_column) + .map(|idx| match required_columns.get(&idx) { + Some((name, data_type)) => Field::new(*name, (*data_type).clone(), true), + None => Field::new( + format!("ffi_partition_evaluator_col_{idx}"), + DataType::Null, + true, + ), + }) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let codec = DefaultPhysicalExtensionCodec {}; + let input_exprs = serialize_physical_exprs(args.input_exprs(), &codec)? + .into_iter() + .map(|expr_node| expr_node.encode_to_vec().into()) + .collect(); + + let input_fields = args + .input_fields() + .iter() + .map(|input_type| FFI_ArrowSchema::try_from(input_type).map(WrappedSchema)) + .collect::<Result<Vec<_>, ArrowError>>()? + .into(); + + let schema: WrappedSchema = schema.into(); + + Ok(Self { + input_exprs, + input_fields, + schema, + is_reversed: args.is_reversed(), + ignore_nulls: args.ignore_nulls(), + }) + } +} + +/// This struct mirrors PartitionEvaluatorArgs except that it contains owned data. +/// It is necessary to create this struct so that we can parse the protobuf +/// data across the FFI boundary and turn it into owned data that +/// PartitionEvaluatorArgs can then reference. +pub struct ForeignPartitionEvaluatorArgs { + input_exprs: Vec<Arc<dyn PhysicalExpr>>, + input_fields: Vec<FieldRef>, + is_reversed: bool, + ignore_nulls: bool, +} + +impl TryFrom<FFI_PartitionEvaluatorArgs> for ForeignPartitionEvaluatorArgs { + type Error = DataFusionError; + + fn try_from(value: FFI_PartitionEvaluatorArgs) -> Result<Self> { + let default_ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + + let schema: SchemaRef = value.schema.into(); + + let input_exprs = value + .input_exprs + .into_iter() + .map(|input_expr_bytes| PhysicalExprNode::decode(input_expr_bytes.as_ref())) + .collect::<std::result::Result<Vec<_>, prost::DecodeError>>() + .map_err(|e| DataFusionError::Execution(e.to_string()))? + .iter() + .map(|expr_node| { + parse_physical_expr(expr_node, &default_ctx, &schema, &codec) + }) + .collect::<Result<Vec<_>>>()?; + + let input_fields = input_exprs + .iter() + .map(|expr| expr.return_field(&schema)) + .collect::<Result<Vec<_>>>()?; + + Ok(Self { + input_exprs, + input_fields, + is_reversed: value.is_reversed, + ignore_nulls: value.ignore_nulls, + }) + } +} + +impl<'a> From<&'a ForeignPartitionEvaluatorArgs> for PartitionEvaluatorArgs<'a> { + fn from(value: &'a ForeignPartitionEvaluatorArgs) -> Self { + PartitionEvaluatorArgs::new( + &value.input_exprs, + &value.input_fields, + value.is_reversed, + value.ignore_nulls, + ) + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/ffi/src/udwf/range.rs b/datafusion/ffi/src/udwf/range.rs new file mode 100644 index 0000000000..1ddcc4199f --- /dev/null +++ b/datafusion/ffi/src/udwf/range.rs @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::ops::Range; + +use abi_stable::StableAbi; + +/// A stable struct for sharing [`Range`] across FFI boundaries. +/// For an explanation of each field, see the corresponding function +/// defined in [`Range`]. +#[repr(C)] +#[derive(Debug, StableAbi)] +#[allow(non_camel_case_types)] +pub struct FFI_Range { + pub start: usize, + pub end: usize, +} + +impl From<Range<usize>> for FFI_Range { + fn from(value: Range<usize>) -> Self { + Self { + start: value.start, + end: value.end, + } + } +} + +impl From<FFI_Range> for Range<usize> { + fn from(value: FFI_Range) -> Self { + Self { + start: value.start, + end: value.end, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_round_trip_ffi_range() { + let original = Range { start: 10, end: 30 }; + + let ffi_range: FFI_Range = original.clone().into(); + let round_trip: Range<usize> = ffi_range.into(); + + assert_eq!(original, round_trip); + } +} diff --git a/datafusion/ffi/src/volatility.rs b/datafusion/ffi/src/volatility.rs index 0aaf68a174..f1705da294 100644 --- a/datafusion/ffi/src/volatility.rs +++ b/datafusion/ffi/src/volatility.rs @@ -19,7 +19,7 @@ use abi_stable::StableAbi; use datafusion::logical_expr::Volatility; #[repr(C)] -#[derive(Debug, StableAbi)] +#[derive(Debug, StableAbi, Clone)] #[allow(non_camel_case_types)] pub enum FFI_Volatility { Immutable, diff --git a/datafusion/ffi/tests/ffi_udwf.rs b/datafusion/ffi/tests/ffi_udwf.rs new file mode 100644 index 0000000000..db9ebba0fd --- /dev/null +++ b/datafusion/ffi/tests/ffi_udwf.rs @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/// Add an additional module here for convenience to scope this to only +/// when the feature integtation-tests is built +#[cfg(feature = "integration-tests")] +mod tests { + use arrow::array::{create_array, ArrayRef}; + use datafusion::error::{DataFusionError, Result}; + use datafusion::logical_expr::expr::Sort; + use datafusion::logical_expr::{col, ExprFunctionExt, WindowUDF}; + use datafusion::prelude::SessionContext; + use datafusion_ffi::tests::create_record_batch; + use datafusion_ffi::tests::utils::get_module; + use datafusion_ffi::udwf::ForeignWindowUDF; + + #[tokio::test] + async fn test_rank_udwf() -> Result<()> { + let module = get_module()?; + + let ffi_rank_func = + module + .create_rank_udwf() + .ok_or(DataFusionError::NotImplemented( + "External table provider failed to implement create_scalar_udf" + .to_string(), + ))?(); + let foreign_rank_func: ForeignWindowUDF = (&ffi_rank_func).try_into()?; + + let udwf: WindowUDF = foreign_rank_func.into(); + + let ctx = SessionContext::default(); + let df = ctx.read_batch(create_record_batch(-5, 5))?; + + let df = df.select(vec![ + col("a"), + udwf.call(vec![]) + .order_by(vec![Sort::new(col("a"), true, true)]) + .build() + .unwrap() + .alias("rank_a"), + ])?; + + df.clone().show().await?; + + let result = df.collect().await?; + let expected = create_array!(UInt64, [1, 2, 3, 4, 5]) as ArrayRef; + + assert_eq!(result.len(), 1); + assert_eq!(result[0].column(1), &expected); + + Ok(()) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org