notfilippo commented on code in PR #12920:
URL: https://github.com/apache/datafusion/pull/12920#discussion_r1807792175


##########
datafusion/ffi/Cargo.toml:
##########
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-ffi"
+description = "Foreign Function Interface implementation for DataFusion"
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+# Specify MSRV here as `cargo msrv` doesn't support workspace version
+rust-version = "1.76"
+
+[lints]
+workspace = true
+
+[lib]
+name = "datafusion_ffi"
+path = "src/lib.rs"
+
+[dependencies]
+abi_stable = "0.11.3"
+arrow = { workspace = true, features = ["ffi"] }
+async-ffi = { version = "0.5.0", features = ["abi_stable"] }
+async-trait = { workspace = true }
+datafusion = { workspace = true, default-features = true }
+datafusion-proto = { workspace = true }
+doc-comment = { workspace = true }
+futures = { workspace = true }
+prost = { workspace = true }
+tokio = { workspace = true }

Review Comment:
   Maybe `tokio` is only needed for tests?



##########
datafusion/ffi/src/table_provider.rs:
##########
@@ -0,0 +1,416 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, ffi::c_void, sync::Arc};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use arrow::{
+    datatypes::{Schema, SchemaRef},
+    ffi::FFI_ArrowSchema,
+};
+use async_ffi::{FfiFuture, FutureExt};
+use async_trait::async_trait;
+use datafusion::{
+    catalog::{Session, TableProvider},
+    datasource::TableType,
+    error::DataFusionError,
+    execution::session_state::SessionStateBuilder,
+    logical_expr::TableProviderFilterPushDown,
+    physical_plan::ExecutionPlan,
+    prelude::{Expr, SessionContext},
+};
+use datafusion_proto::{
+    logical_plan::{
+        from_proto::parse_exprs, to_proto::serialize_exprs, 
DefaultLogicalExtensionCodec,
+    },
+    protobuf::LogicalExprList,
+};
+use prost::Message;
+
+use crate::{
+    plan_properties::WrappedSchema,
+    session_config::ForeignSessionConfig,
+    table_source::{FFI_TableProviderFilterPushDown, FFI_TableType},
+};
+
+use super::{
+    execution_plan::{FFI_ExecutionPlan, ForeignExecutionPlan},
+    session_config::FFI_SessionConfig,
+};
+use datafusion::error::Result;
+
+/// A stable interface for creating a DataFusion TableProvider.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_TableProvider {
+    pub schema: unsafe extern "C" fn(provider: &Self) -> WrappedSchema,
+    pub scan: unsafe extern "C" fn(
+        provider: &Self,
+        session_config: &FFI_SessionConfig,
+        projections: RVec<usize>,
+        filters_serialized: RVec<u8>,
+        limit: ROption<usize>,
+    ) -> FfiFuture<RResult<FFI_ExecutionPlan, RString>>,
+
+    pub table_type: unsafe extern "C" fn(provider: &Self) -> FFI_TableType,
+
+    pub supports_filters_pushdown:
+        unsafe extern "C" fn(
+            provider: &Self,
+            filters_serialized: RVec<u8>,
+        )
+            -> RResult<RVec<FFI_TableProviderFilterPushDown>, RString>,
+
+    pub clone: unsafe extern "C" fn(provider: &Self) -> Self,
+    pub release: unsafe extern "C" fn(arg: &mut Self),
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_TableProvider {}
+unsafe impl Sync for FFI_TableProvider {}
+
+struct ProviderPrivateData {
+    provider: Arc<dyn TableProvider + Send>,
+}
+
+unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> 
WrappedSchema {
+    let private_data = provider.private_data as *const ProviderPrivateData;
+    let provider = &(*private_data).provider;
+
+    // This does silently fail because TableProvider does not return a result.
+    // It expects schema to always pass.
+    let ffi_schema = FFI_ArrowSchema::try_from(provider.schema().as_ref())
+        .unwrap_or(FFI_ArrowSchema::empty());
+
+    WrappedSchema(ffi_schema)
+}
+
+unsafe extern "C" fn table_type_fn_wrapper(
+    provider: &FFI_TableProvider,
+) -> FFI_TableType {
+    let private_data = provider.private_data as *const ProviderPrivateData;
+    let provider = &(*private_data).provider;
+
+    provider.table_type().into()
+}
+
+fn supports_filters_pushdown_internal(
+    provider: &Arc<dyn TableProvider + Send>,
+    filters_serialized: &[u8],
+) -> Result<RVec<FFI_TableProviderFilterPushDown>> {
+    let default_ctx = SessionContext::new();
+    let codec = DefaultLogicalExtensionCodec {};
+
+    let filters = match filters_serialized.is_empty() {
+        true => vec![],
+        false => {
+            let proto_filters = LogicalExprList::decode(filters_serialized)
+                .map_err(|e| DataFusionError::Plan(e.to_string()))?;
+
+            parse_exprs(proto_filters.expr.iter(), &default_ctx, &codec)?
+        }
+    };
+    let filters_borrowed: Vec<&Expr> = filters.iter().collect();
+
+    let results: RVec<_> = provider
+        .supports_filters_pushdown(&filters_borrowed)?
+        .iter()
+        .map(|v| v.into())
+        .collect();
+
+    Ok(results)
+}
+
+unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
+    provider: &FFI_TableProvider,
+    filters_serialized: RVec<u8>,
+) -> RResult<RVec<FFI_TableProviderFilterPushDown>, RString> {
+    let private_data = provider.private_data as *const ProviderPrivateData;
+    let provider = &(*private_data).provider;
+
+    supports_filters_pushdown_internal(provider, &filters_serialized)
+        .map_err(|e| e.to_string().into())
+        .into()
+}
+
+unsafe extern "C" fn scan_fn_wrapper(
+    provider: &FFI_TableProvider,
+    session_config: &FFI_SessionConfig,
+    projections: RVec<usize>,
+    filters_serialized: RVec<u8>,
+    limit: ROption<usize>,
+) -> FfiFuture<RResult<FFI_ExecutionPlan, RString>> {
+    let private_data = provider.private_data as *mut ProviderPrivateData;
+    let internal_provider = &(*private_data).provider;
+    let session_config = session_config.clone();
+
+    async move {
+        let config = match ForeignSessionConfig::new(&session_config) {
+            Ok(c) => c,
+            Err(e) => return RResult::RErr(e.to_string().into()),
+        };
+        let session = SessionStateBuilder::new()
+            .with_default_features()
+            .with_config(config.0)
+            .build();
+        let ctx = SessionContext::new_with_state(session);
+
+        let filters = match filters_serialized.is_empty() {
+            true => vec![],
+            false => {
+                let default_ctx = SessionContext::new();
+                let codec = DefaultLogicalExtensionCodec {};
+
+                let proto_filters =
+                    match LogicalExprList::decode(filters_serialized.as_ref()) 
{
+                        Ok(f) => f,
+                        Err(e) => return RResult::RErr(e.to_string().into()),
+                    };
+
+                match parse_exprs(proto_filters.expr.iter(), &default_ctx, 
&codec) {
+                    Ok(f) => f,
+                    Err(e) => return RResult::RErr(e.to_string().into()),
+                }
+            }
+        };
+
+        let projections: Vec<_> = projections.into_iter().collect();
+        let maybe_projections = match projections.is_empty() {
+            true => None,
+            false => Some(&projections),
+        };
+
+        let plan = match internal_provider
+            .scan(&ctx.state(), maybe_projections, &filters, limit.into())
+            .await
+        {
+            Ok(p) => p,
+            Err(e) => return RResult::RErr(e.to_string().into()),
+        };
+
+        FFI_ExecutionPlan::new(plan, ctx.task_ctx())
+            .map_err(|e| e.to_string().into())
+            .into()
+    }
+    .into_ffi()
+}
+
+unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_TableProvider) {
+    let private_data = Box::from_raw(provider.private_data as *mut 
ProviderPrivateData);
+    drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> 
FFI_TableProvider {
+    let old_private_data = provider.private_data as *const ProviderPrivateData;
+
+    let private_data = Box::into_raw(Box::new(ProviderPrivateData {
+        provider: Arc::clone(&(*old_private_data).provider),
+    })) as *mut c_void;
+
+    FFI_TableProvider {
+        schema: schema_fn_wrapper,
+        scan: scan_fn_wrapper,
+        table_type: table_type_fn_wrapper,
+        supports_filters_pushdown: supports_filters_pushdown_fn_wrapper,
+        clone: clone_fn_wrapper,
+        release: release_fn_wrapper,
+        private_data,
+    }
+}
+
+impl Drop for FFI_TableProvider {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+impl FFI_TableProvider {
+    /// Creates a new [`FFI_TableProvider`].
+    pub fn new(provider: Arc<dyn TableProvider + Send>) -> Self {
+        let private_data = Box::new(ProviderPrivateData { provider });
+
+        Self {
+            schema: schema_fn_wrapper,
+            scan: scan_fn_wrapper,
+            table_type: table_type_fn_wrapper,
+            supports_filters_pushdown: supports_filters_pushdown_fn_wrapper,
+            clone: clone_fn_wrapper,
+            release: release_fn_wrapper,
+            private_data: Box::into_raw(private_data) as *mut c_void,
+        }
+    }
+}
+
+/// This wrapper struct exists on the reciever side of the FFI interface, so 
it has
+/// no guarantees about being able to access the data in `private_data`. Any 
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_TableProvider to interact with the foreign table provider.
+#[derive(Debug)]
+pub struct ForeignTableProvider(FFI_TableProvider);
+
+unsafe impl Send for ForeignTableProvider {}
+unsafe impl Sync for ForeignTableProvider {}
+
+impl ForeignTableProvider {
+    pub fn new(provider: &FFI_TableProvider) -> Self {
+        Self(provider.clone())
+    }
+}
+
+impl Clone for FFI_TableProvider {
+    fn clone(&self) -> Self {
+        unsafe { (self.clone)(self) }
+    }
+}
+
+#[async_trait]
+impl TableProvider for ForeignTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        let schema = unsafe {
+            let wrapped_schema = (self.0.schema)(&self.0);
+            Schema::try_from(&wrapped_schema.0).unwrap_or(Schema::empty())
+        };
+        Arc::new(schema)
+    }
+
+    fn table_type(&self) -> TableType {
+        unsafe { (self.0.table_type)(&self.0).into() }
+    }
+
+    async fn scan(
+        &self,
+        session: &dyn Session,
+        projection: Option<&Vec<usize>>,
+        filters: &[Expr],
+        limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let session_config = FFI_SessionConfig::new(session.config());
+
+        let projections: Option<RVec<usize>> =
+            projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
+
+        let codec = DefaultLogicalExtensionCodec {};
+        let filter_list = LogicalExprList {
+            expr: serialize_exprs(filters, &codec)?,
+        };
+        let filters_serialized = filter_list.encode_to_vec().into();
+
+        let plan = unsafe {
+            let maybe_plan = (self.0.scan)(
+                &self.0,
+                &session_config,
+                projections.unwrap_or_default(),
+                filters_serialized,
+                limit.into(),
+            )
+            .await;
+
+            match maybe_plan {
+                RResult::ROk(p) => ForeignExecutionPlan::new(p)?,
+                RResult::RErr(_) => {
+                    return Err(datafusion::error::DataFusionError::Internal(
+                        "Unable to perform scan via FFI".to_string(),
+                    ))
+                }
+            }
+        };
+
+        Ok(Arc::new(plan))
+    }
+
+    /// Tests whether the table provider can make use of a filter expression
+    /// to optimise data retrieval.
+    fn supports_filters_pushdown(
+        &self,
+        filter: &[&Expr],
+    ) -> Result<Vec<TableProviderFilterPushDown>> {
+        unsafe {
+            let codec = DefaultLogicalExtensionCodec {};
+
+            let expr_list = LogicalExprList {
+                expr: serialize_exprs(filter.iter().map(|f| f.to_owned()), 
&codec)?,
+            };
+            let serialized_filters = expr_list.encode_to_vec();
+
+            let pushdown_fn = self.0.supports_filters_pushdown;
+
+            let pushdowns = pushdown_fn(&self.0, serialized_filters.into());
+
+            match pushdowns {
+                RResult::ROk(p) => Ok(p.iter().map(|v| v.into()).collect()),
+                RResult::RErr(e) => Err(DataFusionError::Plan(e.to_string())),
+            }

Review Comment:
   While supporting this is definitely a good idea I think that implementing 
`supports_filters_pushdown` should be made optional for providers which do not 
support any push down of filters in order to prevent a potentially expensive 
(and wasteful) rendering of the filters as protobuf.



##########
datafusion/ffi/src/execution_plan.rs:
##########
@@ -0,0 +1,373 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, pin::Pin, sync::Arc};
+
+use abi_stable::{
+    std_types::{RResult, RStr, RString, RVec},
+    StableAbi,
+};
+use arrow::ffi_stream::FFI_ArrowArrayStream;
+use datafusion::error::Result;
+use datafusion::{
+    error::DataFusionError,
+    execution::{SendableRecordBatchStream, TaskContext},
+    physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
+};
+
+use crate::{
+    plan_properties::{FFI_PlanProperties, ForeignPlanProperties},
+    record_batch_stream::FFI_RecordBatchStream,
+};
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+pub struct WrappedArrayStream(#[sabi(unsafe_opaque_field)] pub 
FFI_ArrowArrayStream);
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(missing_docs)]
+#[allow(non_camel_case_types)]
+pub struct FFI_ExecutionPlan {

Review Comment:
   I really like the fact that this change follows the same setup as arrow-rs's 
FFI. It makes it familiar to use.



##########
datafusion/ffi/src/record_batch_stream.rs:
##########
@@ -0,0 +1,190 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{ffi::c_void, sync::Arc, task::Poll};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString},
+    StableAbi,
+};
+use arrow::array::{Array, RecordBatch};
+use arrow::{
+    array::{make_array, StructArray},
+    datatypes::{Schema, SchemaRef},
+    ffi::{from_ffi, to_ffi, FFI_ArrowArray, FFI_ArrowSchema},
+};
+use async_ffi::{ContextExt, FfiContext, FfiPoll};
+use datafusion::error::Result;
+use datafusion::{
+    error::DataFusionError,
+    execution::{RecordBatchStream, SendableRecordBatchStream},
+};
+use futures::{Stream, TryStreamExt};
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+pub struct WrappedArray {
+    #[sabi(unsafe_opaque_field)]
+    array: FFI_ArrowArray,
+
+    schema: WrappedSchema,
+}
+
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+pub struct WrappedSchema(#[sabi(unsafe_opaque_field)] FFI_ArrowSchema);
+
+impl From<SchemaRef> for WrappedSchema {
+    fn from(value: SchemaRef) -> Self {
+        let schema = FFI_ArrowSchema::try_from(value.as_ref());
+        WrappedSchema(schema.unwrap_or(FFI_ArrowSchema::empty()))

Review Comment:
   I don't really like that this silently fails but it seems like the only 
error it could realistically encounter is 
https://github.com/apache/arrow-rs/blob/1666a4d7b237370c167e2b90b45d1354e28f500a/arrow-schema/src/ffi.rs#L745-L747.
 This happens in other places in the code as well.



##########
datafusion/ffi/Cargo.toml:
##########
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "datafusion-ffi"
+description = "Foreign Function Interface implementation for DataFusion"
+readme = "README.md"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+repository = { workspace = true }
+license = { workspace = true }
+authors = { workspace = true }
+# Specify MSRV here as `cargo msrv` doesn't support workspace version
+rust-version = "1.76"
+
+[lints]
+workspace = true
+
+[lib]
+name = "datafusion_ffi"
+path = "src/lib.rs"
+
+[dependencies]
+abi_stable = "0.11.3"
+arrow = { workspace = true, features = ["ffi"] }
+async-ffi = { version = "0.5.0", features = ["abi_stable"] }
+async-trait = { workspace = true }
+datafusion = { workspace = true, default-features = true }

Review Comment:
   Given the we are targeting FFI, I think not all features should be enabled 
by default when working the `datafusion-ffi` crate. Some of DataFusion's 
feature bring in some external native dependency (e.g. crypto functions) and 
they may conflict with the native setup of some binding implementations.
   
   Instead I think that we should probably leave the choice of features to the 
user implementing the bindings and leave just the minimum set required for this 
crate to work. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to