timsaucer commented on code in PR #14775:
URL: https://github.com/apache/datafusion/pull/14775#discussion_r2037099896


##########
datafusion/ffi/src/udaf/groups_accumulator.rs:
##########
@@ -0,0 +1,527 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ffi::c_void;
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{
+    array::{Array, ArrayRef, BooleanArray},
+    error::ArrowError,
+    ffi::to_ffi,
+};
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::{EmitTo, GroupsAccumulator},
+};
+
+use crate::{
+    arrow_wrappers::{WrappedArray, WrappedSchema},
+    df_result, rresult, rresult_return,
+};
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_GroupsAccumulator {
+    pub update_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,
+
+    // Evaluate and return a ScalarValues as protobuf bytes
+    pub evaluate: unsafe extern "C" fn(
+        accumulator: &Self,
+        emit_to: FFI_EmitTo,
+    ) -> RResult<WrappedArray, RString>,
+
+    pub size: unsafe extern "C" fn(accumulator: &Self) -> usize,
+
+    pub state: unsafe extern "C" fn(
+        accumulator: &Self,
+        emit_to: FFI_EmitTo,
+    ) -> RResult<RVec<WrappedArray>, RString>,
+
+    pub merge_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,
+
+    pub convert_to_state: unsafe extern "C" fn(
+        accumulator: &Self,
+        values: RVec<WrappedArray>,
+        opt_filter: ROption<WrappedArray>,
+    )
+        -> RResult<RVec<WrappedArray>, RString>,
+
+    pub supports_convert_to_state: bool,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(accumulator: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the 
accumulator.
+    /// A [`ForeignGroupsAccumulator`] should never attempt to access this 
data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_GroupsAccumulator {}
+unsafe impl Sync for FFI_GroupsAccumulator {}
+
+pub struct GroupsAccumulatorPrivateData {
+    pub accumulator: Box<dyn GroupsAccumulator>,
+}
+
+impl FFI_GroupsAccumulator {
+    unsafe fn inner(&self) -> &mut Box<dyn GroupsAccumulator> {
+        let private_data = self.private_data as *mut 
GroupsAccumulatorPrivateData;
+        &mut (*private_data).accumulator
+    }
+}
+
+unsafe extern "C" fn update_batch_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    group_indices: RVec<usize>,
+    opt_filter: ROption<WrappedArray>,
+    total_num_groups: usize,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner();
+
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    let group_indices: Vec<usize> = group_indices.into_iter().collect();
+
+    let maybe_filter = opt_filter.into_option().and_then(|filter| {
+        match ArrayRef::try_from(filter) {
+            Ok(v) => Some(v),
+            Err(e) => {
+                log::warn!("Error during FFI array conversion. Ignoring 
optional filter in groups accumulator. {}", e);
+                None
+            }
+        }
+    }).map(|arr| arr.into_data());
+    let opt_filter = maybe_filter.map(BooleanArray::from);
+
+    rresult!(accumulator.update_batch(
+        &values_arrays,
+        &group_indices,
+        opt_filter.as_ref(),
+        total_num_groups
+    ))
+}
+
+unsafe extern "C" fn evaluate_fn_wrapper(
+    accumulator: &FFI_GroupsAccumulator,
+    emit_to: FFI_EmitTo,
+) -> RResult<WrappedArray, RString> {
+    let accumulator = accumulator.inner();
+
+    let result = rresult_return!(accumulator.evaluate(emit_to.into()));
+
+    rresult!(WrappedArray::try_from(&result))
+}
+
+unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_GroupsAccumulator) -> 
usize {
+    let accumulator = accumulator.inner();
+    accumulator.size()
+}
+
+unsafe extern "C" fn state_fn_wrapper(
+    accumulator: &FFI_GroupsAccumulator,
+    emit_to: FFI_EmitTo,
+) -> RResult<RVec<WrappedArray>, RString> {
+    let accumulator = accumulator.inner();
+
+    let state = rresult_return!(accumulator.state(emit_to.into()));
+    rresult!(state
+        .into_iter()
+        .map(|arr| WrappedArray::try_from(&arr).map_err(DataFusionError::from))
+        .collect::<Result<RVec<_>>>())
+}
+
+unsafe extern "C" fn merge_batch_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    group_indices: RVec<usize>,
+    opt_filter: ROption<WrappedArray>,
+    total_num_groups: usize,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner();
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    let group_indices: Vec<usize> = group_indices.into_iter().collect();
+
+    let maybe_filter = opt_filter.into_option().and_then(|filter| {
+        match ArrayRef::try_from(filter) {
+            Ok(v) => Some(v),
+            Err(e) => {
+                log::warn!("Error during FFI array conversion. Ignoring 
optional filter in groups accumulator. {}", e);
+                None
+            }
+        }

Review Comment:
   I can't remember why I had this fail with a warning. Since the function does 
allow for a result, I think we want to reorder it to fail properly.



##########
datafusion/ffi/src/udaf/groups_accumulator.rs:
##########
@@ -0,0 +1,527 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ffi::c_void;
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{
+    array::{Array, ArrayRef, BooleanArray},
+    error::ArrowError,
+    ffi::to_ffi,
+};
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::{EmitTo, GroupsAccumulator},
+};
+
+use crate::{
+    arrow_wrappers::{WrappedArray, WrappedSchema},
+    df_result, rresult, rresult_return,
+};
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_GroupsAccumulator {
+    pub update_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,
+
+    // Evaluate and return a ScalarValues as protobuf bytes
+    pub evaluate: unsafe extern "C" fn(
+        accumulator: &Self,
+        emit_to: FFI_EmitTo,
+    ) -> RResult<WrappedArray, RString>,
+
+    pub size: unsafe extern "C" fn(accumulator: &Self) -> usize,
+
+    pub state: unsafe extern "C" fn(
+        accumulator: &Self,
+        emit_to: FFI_EmitTo,
+    ) -> RResult<RVec<WrappedArray>, RString>,
+
+    pub merge_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,
+
+    pub convert_to_state: unsafe extern "C" fn(
+        accumulator: &Self,
+        values: RVec<WrappedArray>,
+        opt_filter: ROption<WrappedArray>,
+    )
+        -> RResult<RVec<WrappedArray>, RString>,
+
+    pub supports_convert_to_state: bool,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(accumulator: &mut Self),
+
+    /// Internal data. This is only to be accessed by the provider of the 
accumulator.
+    /// A [`ForeignGroupsAccumulator`] should never attempt to access this 
data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_GroupsAccumulator {}
+unsafe impl Sync for FFI_GroupsAccumulator {}
+
+pub struct GroupsAccumulatorPrivateData {
+    pub accumulator: Box<dyn GroupsAccumulator>,
+}
+
+impl FFI_GroupsAccumulator {
+    unsafe fn inner(&self) -> &mut Box<dyn GroupsAccumulator> {
+        let private_data = self.private_data as *mut 
GroupsAccumulatorPrivateData;
+        &mut (*private_data).accumulator
+    }
+}
+
+unsafe extern "C" fn update_batch_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    group_indices: RVec<usize>,
+    opt_filter: ROption<WrappedArray>,
+    total_num_groups: usize,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner();
+
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    let group_indices: Vec<usize> = group_indices.into_iter().collect();
+
+    let maybe_filter = opt_filter.into_option().and_then(|filter| {
+        match ArrayRef::try_from(filter) {
+            Ok(v) => Some(v),
+            Err(e) => {
+                log::warn!("Error during FFI array conversion. Ignoring 
optional filter in groups accumulator. {}", e);
+                None
+            }
+        }
+    }).map(|arr| arr.into_data());
+    let opt_filter = maybe_filter.map(BooleanArray::from);
+
+    rresult!(accumulator.update_batch(
+        &values_arrays,
+        &group_indices,
+        opt_filter.as_ref(),
+        total_num_groups
+    ))
+}
+
+unsafe extern "C" fn evaluate_fn_wrapper(
+    accumulator: &FFI_GroupsAccumulator,
+    emit_to: FFI_EmitTo,
+) -> RResult<WrappedArray, RString> {
+    let accumulator = accumulator.inner();
+
+    let result = rresult_return!(accumulator.evaluate(emit_to.into()));
+
+    rresult!(WrappedArray::try_from(&result))
+}
+
+unsafe extern "C" fn size_fn_wrapper(accumulator: &FFI_GroupsAccumulator) -> 
usize {
+    let accumulator = accumulator.inner();
+    accumulator.size()
+}
+
+unsafe extern "C" fn state_fn_wrapper(
+    accumulator: &FFI_GroupsAccumulator,
+    emit_to: FFI_EmitTo,
+) -> RResult<RVec<WrappedArray>, RString> {
+    let accumulator = accumulator.inner();
+
+    let state = rresult_return!(accumulator.state(emit_to.into()));
+    rresult!(state
+        .into_iter()
+        .map(|arr| WrappedArray::try_from(&arr).map_err(DataFusionError::from))
+        .collect::<Result<RVec<_>>>())
+}
+
+unsafe extern "C" fn merge_batch_fn_wrapper(
+    accumulator: &mut FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    group_indices: RVec<usize>,
+    opt_filter: ROption<WrappedArray>,
+    total_num_groups: usize,
+) -> RResult<(), RString> {
+    let accumulator = accumulator.inner();
+    let values_arrays = values
+        .into_iter()
+        .map(|v| v.try_into().map_err(DataFusionError::from))
+        .collect::<Result<Vec<ArrayRef>>>();
+    let values_arrays = rresult_return!(values_arrays);
+
+    let group_indices: Vec<usize> = group_indices.into_iter().collect();
+
+    let maybe_filter = opt_filter.into_option().and_then(|filter| {
+        match ArrayRef::try_from(filter) {
+            Ok(v) => Some(v),
+            Err(e) => {
+                log::warn!("Error during FFI array conversion. Ignoring 
optional filter in groups accumulator. {}", e);
+                None
+            }
+        }
+    }).map(|arr| arr.into_data());
+    let opt_filter = maybe_filter.map(BooleanArray::from);
+
+    rresult!(accumulator.merge_batch(
+        &values_arrays,
+        &group_indices,
+        opt_filter.as_ref(),
+        total_num_groups
+    ))
+}
+
+unsafe extern "C" fn convert_to_state_fn_wrapper(
+    accumulator: &FFI_GroupsAccumulator,
+    values: RVec<WrappedArray>,
+    opt_filter: ROption<WrappedArray>,
+) -> RResult<RVec<WrappedArray>, RString> {
+    let accumulator = accumulator.inner();
+    let values = rresult_return!(values
+        .into_iter()
+        .map(|v| ArrayRef::try_from(v).map_err(DataFusionError::from))
+        .collect::<Result<Vec<_>>>());
+
+    let opt_filter = opt_filter.into_option().and_then(|filter| {
+        match ArrayRef::try_from(filter) {
+            Ok(v) => Some(v),
+            Err(e) => {
+                log::warn!("Error during FFI array conversion. Ignoring 
optional filter in groups accumulator. {}", e);
+                None
+            }

Review Comment:
   Same comment as above, return the error



##########
datafusion/ffi/tests/ffi_integration.rs:
##########


Review Comment:
   If you rebase on `main` you'll see that I've started pulling FFI Integration 
tests into their own files because it's starting to get very large. I recommend 
putting these into something like `ffi_udaf.rs` or similar.



##########
datafusion/ffi/src/udaf/groups_accumulator.rs:
##########
@@ -0,0 +1,527 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::ffi::c_void;
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{
+    array::{Array, ArrayRef, BooleanArray},
+    error::ArrowError,
+    ffi::to_ffi,
+};
+use datafusion::{
+    error::{DataFusionError, Result},
+    logical_expr::{EmitTo, GroupsAccumulator},
+};
+
+use crate::{
+    arrow_wrappers::{WrappedArray, WrappedSchema},
+    df_result, rresult, rresult_return,
+};
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_GroupsAccumulator {
+    pub update_batch: unsafe extern "C" fn(
+        accumulator: &mut Self,
+        values: RVec<WrappedArray>,
+        group_indices: RVec<usize>,
+        opt_filter: ROption<WrappedArray>,
+        total_num_groups: usize,
+    ) -> RResult<(), RString>,

Review Comment:
   We need to add documentation for the struct, but it should be very similar 
to the others except pointing to `GroupsAccumulator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to