comphead commented on code in PR #18916:
URL: https://github.com/apache/datafusion/pull/18916#discussion_r2566403654


##########
datafusion/ffi/src/physical_expr/mod.rs:
##########
@@ -0,0 +1,959 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub(crate) mod partitioning;
+pub(crate) mod sort;
+
+use std::{
+    any::Any,
+    ffi::c_void,
+    fmt::{Display, Formatter},
+    hash::{DefaultHasher, Hash, Hasher},
+    sync::Arc,
+};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{
+    array::{ArrayRef, BooleanArray, RecordBatch},
+    datatypes::SchemaRef,
+};
+use arrow_schema::{ffi::FFI_ArrowSchema, DataType, Field, FieldRef, Schema};
+use datafusion_common::{exec_datafusion_err, Result};
+use datafusion_expr::{
+    interval_arithmetic::Interval, sort_properties::ExprProperties,
+    statistics::Distribution, ColumnarValue,
+};
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr_common::physical_expr::fmt_sql;
+
+use crate::{
+    arrow_wrappers::{WrappedArray, WrappedSchema},
+    df_result,
+    expr::{
+        columnar_value::FFI_ColumnarValue, distribution::FFI_Distribution,
+        expr_properties::FFI_ExprProperties, interval::FFI_Interval,
+    },
+    record_batch_stream::{record_batch_to_wrapped_array, 
wrapped_array_to_record_batch},
+    rresult, rresult_return,
+    util::FFIResult,
+};
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_PhysicalExpr {
+    pub data_type: unsafe extern "C" fn(
+        &Self,
+        input_schema: WrappedSchema,
+    ) -> FFIResult<WrappedSchema>,
+
+    pub nullable:
+        unsafe extern "C" fn(&Self, input_schema: WrappedSchema) -> 
FFIResult<bool>,
+
+    pub evaluate:
+        unsafe extern "C" fn(&Self, batch: WrappedArray) -> 
FFIResult<FFI_ColumnarValue>,
+
+    pub return_field: unsafe extern "C" fn(
+        &Self,
+        input_schema: WrappedSchema,
+    ) -> FFIResult<WrappedSchema>,
+
+    pub evaluate_selection: unsafe extern "C" fn(
+        &Self,
+        batch: WrappedArray,
+        selection: WrappedArray,
+    ) -> FFIResult<FFI_ColumnarValue>,
+
+    pub children: unsafe extern "C" fn(&Self) -> RVec<FFI_PhysicalExpr>,
+
+    pub new_with_children:
+        unsafe extern "C" fn(&Self, children: &RVec<FFI_PhysicalExpr>) -> 
FFIResult<Self>,
+
+    pub evaluate_bounds: unsafe extern "C" fn(
+        &Self,
+        children: &RVec<FFI_Interval>,
+    ) -> FFIResult<FFI_Interval>,
+
+    pub propagate_constraints:
+        unsafe extern "C" fn(
+            &Self,
+            interval: &FFI_Interval,
+            children: &RVec<FFI_Interval>,
+        ) -> FFIResult<ROption<RVec<FFI_Interval>>>,
+
+    pub evaluate_statistics: unsafe extern "C" fn(
+        &Self,
+        children: &RVec<FFI_Distribution>,
+    ) -> FFIResult<FFI_Distribution>,
+
+    pub propagate_statistics:
+        unsafe extern "C" fn(
+            &Self,
+            parent: &FFI_Distribution,
+            children: &RVec<FFI_Distribution>,
+        ) -> FFIResult<ROption<RVec<FFI_Distribution>>>,
+
+    pub get_properties: unsafe extern "C" fn(
+        &Self,
+        children: &RVec<FFI_ExprProperties>,
+    ) -> FFIResult<FFI_ExprProperties>,
+
+    pub fmt_sql: unsafe extern "C" fn(&Self) -> FFIResult<RString>,
+
+    pub snapshot: unsafe extern "C" fn(&Self) -> 
FFIResult<ROption<FFI_PhysicalExpr>>,
+
+    pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64,
+
+    pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool,
+
+    // Display trait
+    pub display: unsafe extern "C" fn(&Self) -> RString,
+
+    // Hash trait
+    pub hash: unsafe extern "C" fn(&Self) -> u64,
+
+    /// Used to create a clone on the provider of the execution plan. This 
should
+    /// only need to be called by the receiver of the plan.
+    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(arg: &mut Self),
+
+    /// Return the major DataFusion version number of this provider.
+    pub version: unsafe extern "C" fn() -> u64,
+
+    /// Internal data. This is only to be accessed by the provider of the plan.
+    /// A [`ForeignPhysicalExpr`] should never attempt to access this data.
+    pub private_data: *mut c_void,
+
+    /// Utility to identify when FFI objects are accessed locally through
+    /// the foreign interface.
+    pub library_marker_id: extern "C" fn() -> usize,
+}
+
+unsafe impl Send for FFI_PhysicalExpr {}
+unsafe impl Sync for FFI_PhysicalExpr {}
+
+impl FFI_PhysicalExpr {
+    fn inner(&self) -> &Arc<dyn PhysicalExpr> {
+        unsafe {
+            let private_data = self.private_data as *const 
PhysicalExprPrivateData;
+            &(*private_data).expr
+        }
+    }
+}
+
+struct PhysicalExprPrivateData {
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+unsafe extern "C" fn data_type_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    input_schema: WrappedSchema,
+) -> FFIResult<WrappedSchema> {
+    let expr = expr.inner();
+    let schema: SchemaRef = input_schema.into();
+    let data_type = expr
+        .data_type(&schema)
+        .and_then(|dt| FFI_ArrowSchema::try_from(dt).map_err(Into::into))
+        .map(WrappedSchema);
+    rresult!(data_type)
+}
+
+unsafe extern "C" fn nullable_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    input_schema: WrappedSchema,
+) -> FFIResult<bool> {
+    let expr = expr.inner();
+    let schema: SchemaRef = input_schema.into();
+    rresult!(expr.nullable(&schema))
+}
+
+unsafe extern "C" fn evaluate_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    batch: WrappedArray,
+) -> FFIResult<FFI_ColumnarValue> {
+    let batch = rresult_return!(wrapped_array_to_record_batch(batch));
+    rresult!(expr
+        .inner()
+        .evaluate(&batch)
+        .and_then(FFI_ColumnarValue::try_from))
+}
+
+unsafe extern "C" fn return_field_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    input_schema: WrappedSchema,
+) -> FFIResult<WrappedSchema> {
+    let expr = expr.inner();
+    let schema: SchemaRef = input_schema.into();
+    rresult!(expr
+        .return_field(&schema)
+        .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(Into::into))
+        .map(WrappedSchema))
+}
+
+unsafe extern "C" fn evaluate_selection_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    batch: WrappedArray,
+    selection: WrappedArray,
+) -> FFIResult<FFI_ColumnarValue> {
+    let batch = rresult_return!(wrapped_array_to_record_batch(batch));
+    let selection: ArrayRef = rresult_return!(selection.try_into());
+    let selection = rresult_return!(selection
+        .as_any()
+        .downcast_ref::<BooleanArray>()
+        .ok_or(exec_datafusion_err!("Unexpected selection array type")));
+    rresult!(expr
+        .inner()
+        .evaluate_selection(&batch, selection)
+        .and_then(FFI_ColumnarValue::try_from))
+}
+
+unsafe extern "C" fn children_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+) -> RVec<FFI_PhysicalExpr> {
+    let expr = expr.inner();
+    let children = expr.children();
+    children
+        .into_iter()
+        .map(|child| FFI_PhysicalExpr::from(Arc::clone(child)))
+        .collect()
+}
+
+unsafe extern "C" fn new_with_children_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    children: &RVec<FFI_PhysicalExpr>,
+) -> FFIResult<FFI_PhysicalExpr> {
+    let expr = Arc::clone(expr.inner());
+    let children = children.iter().map(Into::into).collect::<Vec<_>>();
+    rresult!(expr.with_new_children(children).map(FFI_PhysicalExpr::from))
+}
+
+unsafe extern "C" fn evaluate_bounds_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    children: &RVec<FFI_Interval>,
+) -> FFIResult<FFI_Interval> {
+    let expr = expr.inner();
+    let children = rresult_return!(children
+        .iter()
+        .map(Interval::try_from)
+        .collect::<Result<Vec<_>>>());
+    let children_borrowed = children.iter().collect::<Vec<_>>();
+
+    rresult!(expr
+        .evaluate_bounds(&children_borrowed)
+        .and_then(FFI_Interval::try_from))
+}
+
+unsafe extern "C" fn propagate_constraints_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    interval: &FFI_Interval,
+    children: &RVec<FFI_Interval>,
+) -> FFIResult<ROption<RVec<FFI_Interval>>> {
+    let expr = expr.inner();
+    let interval = rresult_return!(Interval::try_from(interval));
+    let children = rresult_return!(children
+        .iter()
+        .map(Interval::try_from)
+        .collect::<Result<Vec<_>>>());
+    let children_borrowed = children.iter().collect::<Vec<_>>();
+
+    let result =
+        rresult_return!(expr.propagate_constraints(&interval, 
&children_borrowed));
+
+    let result = rresult_return!(result
+        .map(|intervals| intervals
+            .into_iter()
+            .map(FFI_Interval::try_from)
+            .collect::<Result<RVec<_>>>())
+        .transpose());
+
+    RResult::ROk(result.into())
+}
+
+unsafe extern "C" fn evaluate_statistics_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    children: &RVec<FFI_Distribution>,
+) -> FFIResult<FFI_Distribution> {
+    let expr = expr.inner();
+    let children = rresult_return!(children
+        .iter()
+        .map(Distribution::try_from)
+        .collect::<Result<Vec<_>>>());
+    let children_borrowed = children.iter().collect::<Vec<_>>();
+    rresult!(expr
+        .evaluate_statistics(&children_borrowed)
+        .and_then(|dist| FFI_Distribution::try_from(&dist)))
+}
+
+unsafe extern "C" fn propagate_statistics_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    parent: &FFI_Distribution,
+    children: &RVec<FFI_Distribution>,
+) -> FFIResult<ROption<RVec<FFI_Distribution>>> {
+    let expr = expr.inner();
+    let parent = rresult_return!(Distribution::try_from(parent));
+    let children = rresult_return!(children
+        .iter()
+        .map(Distribution::try_from)
+        .collect::<Result<Vec<_>>>());
+    let children_borrowed = children.iter().collect::<Vec<_>>();
+
+    let result = rresult_return!(expr.propagate_statistics(&parent, 
&children_borrowed));
+    let result = rresult_return!(result
+        .map(|dists| dists
+            .iter()
+            .map(FFI_Distribution::try_from)
+            .collect::<Result<RVec<_>>>())
+        .transpose());
+
+    RResult::ROk(result.into())
+}
+
+unsafe extern "C" fn get_properties_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+    children: &RVec<FFI_ExprProperties>,
+) -> FFIResult<FFI_ExprProperties> {
+    let expr = expr.inner();
+    let children = rresult_return!(children
+        .iter()
+        .map(ExprProperties::try_from)
+        .collect::<Result<Vec<_>>>());
+    rresult!(expr
+        .get_properties(&children)
+        .and_then(|p| FFI_ExprProperties::try_from(&p)))
+}
+
+unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> 
FFIResult<RString> {
+    let expr = expr.inner();
+    let result = fmt_sql(expr.as_ref()).to_string();
+    RResult::ROk(result.into())
+}
+
+unsafe extern "C" fn snapshot_fn_wrapper(
+    expr: &FFI_PhysicalExpr,
+) -> FFIResult<ROption<FFI_PhysicalExpr>> {
+    let expr = expr.inner();
+    rresult!(expr
+        .snapshot()
+        .map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into()))
+}
+
+unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) 
-> u64 {
+    let expr = expr.inner();
+    expr.snapshot_generation()
+}
+
+unsafe extern "C" fn is_volatile_node_fn_wrapper(expr: &FFI_PhysicalExpr) -> 
bool {
+    let expr = expr.inner();
+    expr.is_volatile_node()
+}
+unsafe extern "C" fn display_fn_wrapper(expr: &FFI_PhysicalExpr) -> RString {
+    let expr = expr.inner();
+    format!("{expr}").into()
+}
+
+unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
+    let expr = expr.inner();
+    // let mut hasher = DefaultHasher::new();
+    let mut hasher = DefaultHasher::new();
+    expr.hash(&mut hasher);
+    hasher.finish()
+}
+
+unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) {

Review Comment:
   any chance double free can happen if this wrapper called concurrenlty? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to