This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9d1bfc1bfb FFI support for versions and alternate tokio runtimes 
(#13937)
9d1bfc1bfb is described below

commit 9d1bfc1bfb4b6fc59c36a391b21d5b4bb7191804
Author: Tim Saucer <[email protected]>
AuthorDate: Fri Jan 31 20:02:25 2025 -0500

    FFI support for versions and alternate tokio runtimes (#13937)
    
    * Add optional reference to tokio runtime for table providers
    
    * Add function to return the library version over FFI
    
    * Resolve clippy warnings
    
    * Function does not need to be defined as unsafe
    
    * Add integration test for FFI table provider
    
    * Add version call on FFI integration test
    
    * Making use explicit on crate to try to get CI to ensure it builds first
    
    * Add license text
    
    * Fix unit test to find deps in ci profile
    
    * Remove ffitest crate and put test lib behind a feature flag
    
    * Add integation-tests feature to ci tests
    
    * Add integration-tests feature to CI run
    
    * Add clarifying text
    
    * Update CI to only run integration tests for certain checks
    
    * When the feature integtation-tests is enabled, we get conflicting library 
entries for the example table provider and integration test, so disable the 
example during CI run
    
    * Remove typo
    
    * Specify each excluded crate separately
    
    * Doc tests did not need the exclusion
    
    * Integration tests shouldn't need doc test
---
 .github/workflows/rust.yml                         |  10 +-
 ci/scripts/rust_clippy.sh                          |   2 +-
 .../ffi/ffi_example_table_provider/src/lib.rs      |   2 +-
 datafusion/ffi/Cargo.toml                          |   9 +-
 datafusion/ffi/src/execution_plan.rs               |  30 ++-
 datafusion/ffi/src/lib.rs                          |  13 +
 datafusion/ffi/src/record_batch_stream.rs          |  28 +-
 datafusion/ffi/src/table_provider.rs               |  21 +-
 datafusion/ffi/src/tests/async_provider.rs         | 282 +++++++++++++++++++++
 datafusion/ffi/src/tests/mod.rs                    |  99 ++++++++
 .../ffi/src/{lib.rs => tests/sync_provider.rs}     |  31 ++-
 datafusion/ffi/tests/table_provider.rs             | 135 ++++++++++
 12 files changed, 630 insertions(+), 32 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index d5d60b131c..0be709a4a7 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -60,7 +60,7 @@ jobs:
         with:
           rust-version: stable
       - name: Prepare cargo build
-        run: cargo check --profile ci --all-targets
+        run: cargo check --profile ci --all-targets --features 
integration-tests
 
   # cargo check common, functions and substrait with no default features
   linux-cargo-check-no-default-features:
@@ -92,8 +92,8 @@ jobs:
       - name: Check workspace in debug mode
         run: cargo check --profile ci --all-targets --workspace
 
-      - name: Check workspace with avro,json features
-        run: cargo check --profile ci --workspace --benches --features 
avro,json
+      - name: Check workspace with additional features
+        run: cargo check --profile ci --workspace --benches --features 
avro,json,integration-tests
 
       - name: Check Cargo.lock for datafusion-cli
         run: |
@@ -185,7 +185,7 @@ jobs:
         with:
           rust-version: stable
       - name: Run tests (excluding doctests)
-        run: cargo test --profile ci --exclude datafusion-examples --exclude 
datafusion-benchmarks --workspace --lib --tests --bins --features 
avro,json,backtrace
+        run: cargo test --profile ci --exclude datafusion-examples --exclude 
ffi_example_table_provider --exclude datafusion-benchmarks --workspace --lib 
--tests --bins --features avro,json,backtrace,integration-tests
       - name: Verify Working Directory Clean
         run: git diff --exit-code
 
@@ -417,7 +417,7 @@ jobs:
       - name: Run tests (excluding doctests)
         shell: bash
         run: |
-          cargo test --profile ci --lib --tests --bins --features 
avro,json,backtrace
+          cargo test --profile ci --lib --tests --bins --features 
avro,json,backtrace,integration-tests
           cd datafusion-cli
           cargo test --profile ci --lib --tests --bins --all-features
 
diff --git a/ci/scripts/rust_clippy.sh b/ci/scripts/rust_clippy.sh
index f5c8b61e1c..01eb6e710a 100755
--- a/ci/scripts/rust_clippy.sh
+++ b/ci/scripts/rust_clippy.sh
@@ -18,6 +18,6 @@
 # under the License.
 
 set -ex
-cargo clippy --all-targets --workspace --features avro,pyarrow -- -D warnings
+cargo clippy --all-targets --workspace --features 
avro,pyarrow,integration-tests -- -D warnings
 cd datafusion-cli
 cargo clippy --all-targets --all-features -- -D warnings
diff --git 
a/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs 
b/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs
index c7eea8a807..c37d8f835c 100644
--- a/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs
+++ b/datafusion-examples/examples/ffi/ffi_example_table_provider/src/lib.rs
@@ -53,7 +53,7 @@ extern "C" fn construct_simple_table_provider() -> 
FFI_TableProvider {
 
     let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
 
-    FFI_TableProvider::new(Arc::new(table_provider), true)
+    FFI_TableProvider::new(Arc::new(table_provider), true, None)
 }
 
 #[export_root_module]
diff --git a/datafusion/ffi/Cargo.toml b/datafusion/ffi/Cargo.toml
index a0179ec44d..1a6248322d 100644
--- a/datafusion/ffi/Cargo.toml
+++ b/datafusion/ffi/Cargo.toml
@@ -33,10 +33,13 @@ workspace = true
 [lib]
 name = "datafusion_ffi"
 path = "src/lib.rs"
+crate-type = ["cdylib", "rlib"]
 
 [dependencies]
 abi_stable = "0.11.3"
 arrow = { workspace = true, features = ["ffi"] }
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
 async-ffi = { version = "0.5.0", features = ["abi_stable"] }
 async-trait = { workspace = true }
 datafusion = { workspace = true, default-features = false }
@@ -44,7 +47,11 @@ datafusion-proto = { workspace = true }
 futures = { workspace = true }
 log = { workspace = true }
 prost = { workspace = true }
+semver = "1.0.24"
+tokio = { workspace = true }
 
 [dev-dependencies]
 doc-comment = { workspace = true }
-tokio = { workspace = true }
+
+[features]
+integration-tests = []
diff --git a/datafusion/ffi/src/execution_plan.rs 
b/datafusion/ffi/src/execution_plan.rs
index 5ab321cc01..a8c2f42fe2 100644
--- a/datafusion/ffi/src/execution_plan.rs
+++ b/datafusion/ffi/src/execution_plan.rs
@@ -27,6 +27,7 @@ use datafusion::{
     execution::{SendableRecordBatchStream, TaskContext},
     physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
 };
+use tokio::runtime::Runtime;
 
 use crate::{
     plan_properties::FFI_PlanProperties, 
record_batch_stream::FFI_RecordBatchStream,
@@ -71,6 +72,7 @@ unsafe impl Sync for FFI_ExecutionPlan {}
 pub struct ExecutionPlanPrivateData {
     pub plan: Arc<dyn ExecutionPlan>,
     pub context: Arc<TaskContext>,
+    pub runtime: Option<Arc<Runtime>>,
 }
 
 unsafe extern "C" fn properties_fn_wrapper(
@@ -88,11 +90,14 @@ unsafe extern "C" fn children_fn_wrapper(
     let private_data = plan.private_data as *const ExecutionPlanPrivateData;
     let plan = &(*private_data).plan;
     let ctx = &(*private_data).context;
+    let runtime = &(*private_data).runtime;
 
     let children: Vec<_> = plan
         .children()
         .into_iter()
-        .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), 
Arc::clone(ctx)))
+        .map(|child| {
+            FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), 
runtime.clone())
+        })
         .collect();
 
     children.into()
@@ -105,9 +110,10 @@ unsafe extern "C" fn execute_fn_wrapper(
     let private_data = plan.private_data as *const ExecutionPlanPrivateData;
     let plan = &(*private_data).plan;
     let ctx = &(*private_data).context;
+    let runtime = (*private_data).runtime.as_ref().map(Arc::clone);
 
     match plan.execute(partition, Arc::clone(ctx)) {
-        Ok(rbs) => RResult::ROk(rbs.into()),
+        Ok(rbs) => RResult::ROk(FFI_RecordBatchStream::new(rbs, runtime)),
         Err(e) => RResult::RErr(
             format!("Error occurred during FFI_ExecutionPlan execute: {}", 
e).into(),
         ),
@@ -129,7 +135,11 @@ unsafe extern "C" fn clone_fn_wrapper(plan: 
&FFI_ExecutionPlan) -> FFI_Execution
     let private_data = plan.private_data as *const ExecutionPlanPrivateData;
     let plan_data = &(*private_data);
 
-    FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), 
Arc::clone(&plan_data.context))
+    FFI_ExecutionPlan::new(
+        Arc::clone(&plan_data.plan),
+        Arc::clone(&plan_data.context),
+        plan_data.runtime.clone(),
+    )
 }
 
 impl Clone for FFI_ExecutionPlan {
@@ -140,8 +150,16 @@ impl Clone for FFI_ExecutionPlan {
 
 impl FFI_ExecutionPlan {
     /// This function is called on the provider's side.
-    pub fn new(plan: Arc<dyn ExecutionPlan>, context: Arc<TaskContext>) -> 
Self {
-        let private_data = Box::new(ExecutionPlanPrivateData { plan, context 
});
+    pub fn new(
+        plan: Arc<dyn ExecutionPlan>,
+        context: Arc<TaskContext>,
+        runtime: Option<Arc<Runtime>>,
+    ) -> Self {
+        let private_data = Box::new(ExecutionPlanPrivateData {
+            plan,
+            context,
+            runtime,
+        });
 
         Self {
             properties: properties_fn_wrapper,
@@ -357,7 +375,7 @@ mod tests {
         let original_plan = Arc::new(EmptyExec::new(schema));
         let original_name = original_plan.name().to_string();
 
-        let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx());
+        let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), 
None);
 
         let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
 
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 8e09780edf..bef36b1ddd 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -26,5 +26,18 @@ pub mod session_config;
 pub mod table_provider;
 pub mod table_source;
 
+#[cfg(feature = "integration-tests")]
+pub mod tests;
+
+/// Returns the major version of the FFI implementation. If the API evolves,
+/// we use the major version to identify compatibility over the unsafe
+/// boundary. This call is intended to be used by implementers to validate
+/// they have compatible libraries.
+pub extern "C" fn version() -> u64 {
+    let version_str = env!("CARGO_PKG_VERSION");
+    let version = semver::Version::parse(version_str).expect("Invalid version 
string");
+    version.major
+}
+
 #[cfg(doctest)]
 doc_comment::doctest!("../README.md", readme_example_test);
diff --git a/datafusion/ffi/src/record_batch_stream.rs 
b/datafusion/ffi/src/record_batch_stream.rs
index c944e56c5c..878ac24f67 100644
--- a/datafusion/ffi/src/record_batch_stream.rs
+++ b/datafusion/ffi/src/record_batch_stream.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::{ffi::c_void, task::Poll};
+use std::{ffi::c_void, sync::Arc, task::Poll};
 
 use abi_stable::{
     std_types::{ROption, RResult, RString},
@@ -33,6 +33,7 @@ use datafusion::{
     execution::{RecordBatchStream, SendableRecordBatchStream},
 };
 use futures::{Stream, TryStreamExt};
+use tokio::runtime::Runtime;
 
 use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
 
@@ -58,12 +59,27 @@ pub struct FFI_RecordBatchStream {
     pub private_data: *mut c_void,
 }
 
+pub struct RecordBatchStreamPrivateData {
+    pub rbs: SendableRecordBatchStream,
+    pub runtime: Option<Arc<Runtime>>,
+}
+
 impl From<SendableRecordBatchStream> for FFI_RecordBatchStream {
     fn from(stream: SendableRecordBatchStream) -> Self {
+        Self::new(stream, None)
+    }
+}
+
+impl FFI_RecordBatchStream {
+    pub fn new(stream: SendableRecordBatchStream, runtime: 
Option<Arc<Runtime>>) -> Self {
+        let private_data = Box::into_raw(Box::new(RecordBatchStreamPrivateData 
{
+            rbs: stream,
+            runtime,
+        })) as *mut c_void;
         FFI_RecordBatchStream {
             poll_next: poll_next_fn_wrapper,
             schema: schema_fn_wrapper,
-            private_data: Box::into_raw(Box::new(stream)) as *mut c_void,
+            private_data,
         }
     }
 }
@@ -71,7 +87,8 @@ impl From<SendableRecordBatchStream> for 
FFI_RecordBatchStream {
 unsafe impl Send for FFI_RecordBatchStream {}
 
 unsafe extern "C" fn schema_fn_wrapper(stream: &FFI_RecordBatchStream) -> 
WrappedSchema {
-    let stream = stream.private_data as *const SendableRecordBatchStream;
+    let private_data = stream.private_data as *const 
RecordBatchStreamPrivateData;
+    let stream = &(*private_data).rbs;
 
     (*stream).schema().into()
 }
@@ -106,7 +123,10 @@ unsafe extern "C" fn poll_next_fn_wrapper(
     stream: &FFI_RecordBatchStream,
     cx: &mut FfiContext,
 ) -> FfiPoll<ROption<RResult<WrappedArray, RString>>> {
-    let stream = stream.private_data as *mut SendableRecordBatchStream;
+    let private_data = stream.private_data as *mut 
RecordBatchStreamPrivateData;
+    let stream = &mut (*private_data).rbs;
+
+    let _guard = (*private_data).runtime.as_ref().map(|rt| rt.enter());
 
     let poll_result = cx.with_context(|std_cx| {
         (*stream)
diff --git a/datafusion/ffi/src/table_provider.rs 
b/datafusion/ffi/src/table_provider.rs
index b229d908d1..183dfc8755 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -40,6 +40,7 @@ use datafusion_proto::{
     protobuf::LogicalExprList,
 };
 use prost::Message;
+use tokio::runtime::Runtime;
 
 use crate::{
     arrow_wrappers::WrappedSchema,
@@ -139,6 +140,9 @@ pub struct FFI_TableProvider {
     /// Release the memory of the private data when it is no longer being used.
     pub release: unsafe extern "C" fn(arg: &mut Self),
 
+    /// Return the major DataFusion version number of this provider.
+    pub version: unsafe extern "C" fn() -> u64,
+
     /// Internal data. This is only to be accessed by the provider of the plan.
     /// A [`ForeignExecutionPlan`] should never attempt to access this data.
     pub private_data: *mut c_void,
@@ -149,6 +153,7 @@ unsafe impl Sync for FFI_TableProvider {}
 
 struct ProviderPrivateData {
     provider: Arc<dyn TableProvider + Send>,
+    runtime: Option<Arc<Runtime>>,
 }
 
 unsafe extern "C" fn schema_fn_wrapper(provider: &FFI_TableProvider) -> 
WrappedSchema {
@@ -216,6 +221,7 @@ unsafe extern "C" fn scan_fn_wrapper(
     let private_data = provider.private_data as *mut ProviderPrivateData;
     let internal_provider = &(*private_data).provider;
     let session_config = session_config.clone();
+    let runtime = &(*private_data).runtime;
 
     async move {
         let config = match ForeignSessionConfig::try_from(&session_config) {
@@ -261,7 +267,11 @@ unsafe extern "C" fn scan_fn_wrapper(
             Err(e) => return RResult::RErr(e.to_string().into()),
         };
 
-        RResult::ROk(FFI_ExecutionPlan::new(plan, ctx.task_ctx()))
+        RResult::ROk(FFI_ExecutionPlan::new(
+            plan,
+            ctx.task_ctx(),
+            runtime.clone(),
+        ))
     }
     .into_ffi()
 }
@@ -273,9 +283,11 @@ unsafe extern "C" fn release_fn_wrapper(provider: &mut 
FFI_TableProvider) {
 
 unsafe extern "C" fn clone_fn_wrapper(provider: &FFI_TableProvider) -> 
FFI_TableProvider {
     let old_private_data = provider.private_data as *const ProviderPrivateData;
+    let runtime = (*old_private_data).runtime.clone();
 
     let private_data = Box::into_raw(Box::new(ProviderPrivateData {
         provider: Arc::clone(&(*old_private_data).provider),
+        runtime,
     })) as *mut c_void;
 
     FFI_TableProvider {
@@ -285,6 +297,7 @@ unsafe extern "C" fn clone_fn_wrapper(provider: 
&FFI_TableProvider) -> FFI_Table
         supports_filters_pushdown: provider.supports_filters_pushdown,
         clone: clone_fn_wrapper,
         release: release_fn_wrapper,
+        version: super::version,
         private_data,
     }
 }
@@ -300,8 +313,9 @@ impl FFI_TableProvider {
     pub fn new(
         provider: Arc<dyn TableProvider + Send>,
         can_support_pushdown_filters: bool,
+        runtime: Option<Arc<Runtime>>,
     ) -> Self {
-        let private_data = Box::new(ProviderPrivateData { provider });
+        let private_data = Box::new(ProviderPrivateData { provider, runtime });
 
         Self {
             schema: schema_fn_wrapper,
@@ -313,6 +327,7 @@ impl FFI_TableProvider {
             },
             clone: clone_fn_wrapper,
             release: release_fn_wrapper,
+            version: super::version,
             private_data: Box::into_raw(private_data) as *mut c_void,
         }
     }
@@ -463,7 +478,7 @@ mod tests {
         let provider =
             Arc::new(MemTable::try_new(schema, vec![vec![batch1], 
vec![batch2]])?);
 
-        let ffi_provider = FFI_TableProvider::new(provider, true);
+        let ffi_provider = FFI_TableProvider::new(provider, true, None);
 
         let foreign_table_provider: ForeignTableProvider = 
(&ffi_provider).into();
 
diff --git a/datafusion/ffi/src/tests/async_provider.rs 
b/datafusion/ffi/src/tests/async_provider.rs
new file mode 100644
index 0000000000..38ddd13952
--- /dev/null
+++ b/datafusion/ffi/src/tests/async_provider.rs
@@ -0,0 +1,282 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This is an example of an async table provider that will call functions on
+//! the tokio runtime of the library providing the function. Since we cannot
+//! share a tokio runtime across the ffi boundary and the producer and consumer
+//! may have different runtimes, we need to store a reference to the runtime
+//! and enter it during streaming calls. The entering of the runtime will
+//! occur by the datafusion_ffi crate during the streaming calls. This code
+//! serves as an integration test of this feature. If we do not correctly
+//! access the runtime, then you will get a panic when trying to do operations
+//! such as spawning a tokio task.
+
+use std::{any::Any, fmt::Debug, sync::Arc};
+
+use crate::table_provider::FFI_TableProvider;
+use arrow_array::RecordBatch;
+use arrow_schema::Schema;
+use async_trait::async_trait;
+use datafusion::{
+    catalog::{Session, TableProvider},
+    error::{DataFusionError, Result},
+    execution::RecordBatchStream,
+    physical_expr::EquivalenceProperties,
+    physical_plan::{ExecutionPlan, Partitioning},
+    prelude::Expr,
+};
+use futures::Stream;
+use tokio::{
+    runtime::Runtime,
+    sync::{broadcast, mpsc},
+};
+
+use super::create_record_batch;
+
+#[derive(Debug)]
+pub struct AsyncTableProvider {
+    batch_request: mpsc::Sender<bool>,
+    shutdown: mpsc::Sender<bool>,
+    batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
+    _join_handle: Option<std::thread::JoinHandle<()>>,
+}
+
+fn async_table_provider_thread(
+    mut shutdown: mpsc::Receiver<bool>,
+    mut batch_request: mpsc::Receiver<bool>,
+    batch_sender: broadcast::Sender<Option<RecordBatch>>,
+    tokio_rt: mpsc::Sender<Arc<Runtime>>,
+) {
+    let runtime = Arc::new(
+        tokio::runtime::Builder::new_current_thread()
+            .build()
+            .expect("Unable to create tokio runtime"),
+    );
+    let _runtime_guard = runtime.enter();
+    tokio_rt
+        .blocking_send(Arc::clone(&runtime))
+        .expect("Unable to send tokio runtime back to main thread");
+
+    runtime.block_on(async move {
+        let mut num_received = 0;
+        while let Some(true) = batch_request.recv().await {
+            let record_batch = match num_received {
+                0 => Some(create_record_batch(1, 5)),
+                1 => Some(create_record_batch(6, 1)),
+                2 => Some(create_record_batch(7, 5)),
+                _ => None,
+            };
+            num_received += 1;
+
+            if batch_sender.send(record_batch).is_err() {
+                break;
+            }
+        }
+    });
+
+    let _ = shutdown.blocking_recv();
+}
+
+pub fn start_async_provider() -> (AsyncTableProvider, Arc<Runtime>) {
+    let (batch_request_tx, batch_request_rx) = mpsc::channel(10);
+    let (record_batch_tx, record_batch_rx) = broadcast::channel(10);
+    let (tokio_rt_tx, mut tokio_rt_rx) = mpsc::channel(10);
+    let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
+
+    // It is important that we are not using tokio to spawn here. We want this
+    // other thread to create it's own runtime, which is similar to a model 
used
+    // in datafusion-python and probably other places. This will let us test 
that
+    // we do correctly enter the runtime of the foreign provider.
+    let join_handle = Some(std::thread::spawn(move || {
+        async_table_provider_thread(
+            shutdown_rx,
+            batch_request_rx,
+            record_batch_tx,
+            tokio_rt_tx,
+        )
+    }));
+
+    let tokio_rt = tokio_rt_rx
+        .blocking_recv()
+        .expect("Unable to receive tokio runtime from spawned thread");
+
+    let table_provider = AsyncTableProvider {
+        shutdown: shutdown_tx,
+        batch_request: batch_request_tx,
+        batch_receiver: record_batch_rx,
+        _join_handle: join_handle,
+    };
+
+    (table_provider, tokio_rt)
+}
+
+#[async_trait]
+impl TableProvider for AsyncTableProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        super::create_test_schema()
+    }
+
+    fn table_type(&self) -> datafusion::logical_expr::TableType {
+        datafusion::logical_expr::TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &dyn Session,
+        _projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(AsyncTestExecutionPlan::new(
+            self.batch_request.clone(),
+            self.batch_receiver.resubscribe(),
+        )))
+    }
+}
+
+impl Drop for AsyncTableProvider {
+    fn drop(&mut self) {
+        self.shutdown
+            .blocking_send(false)
+            .expect("Unable to call shutdown on spawned thread.")
+    }
+}
+
+#[derive(Debug)]
+struct AsyncTestExecutionPlan {
+    properties: datafusion::physical_plan::PlanProperties,
+    batch_request: mpsc::Sender<bool>,
+    batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
+}
+
+impl AsyncTestExecutionPlan {
+    pub fn new(
+        batch_request: mpsc::Sender<bool>,
+        batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
+    ) -> Self {
+        Self {
+            properties: datafusion::physical_plan::PlanProperties::new(
+                EquivalenceProperties::new(super::create_test_schema()),
+                Partitioning::UnknownPartitioning(3),
+                
datafusion::physical_plan::execution_plan::EmissionType::Incremental,
+                
datafusion::physical_plan::execution_plan::Boundedness::Bounded,
+            ),
+            batch_request,
+            batch_receiver,
+        }
+    }
+}
+
+impl ExecutionPlan for AsyncTestExecutionPlan {
+    fn name(&self) -> &str {
+        "async test execution plan"
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        &self.properties
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        Vec::default()
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<datafusion::execution::TaskContext>,
+    ) -> Result<datafusion::execution::SendableRecordBatchStream> {
+        Ok(Box::pin(AsyncTestRecordBatchStream {
+            batch_request: self.batch_request.clone(),
+            batch_receiver: self.batch_receiver.resubscribe(),
+        }))
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for AsyncTestExecutionPlan {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        _f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        // Do nothing, just a test
+        Ok(())
+    }
+}
+
+struct AsyncTestRecordBatchStream {
+    batch_request: mpsc::Sender<bool>,
+    batch_receiver: broadcast::Receiver<Option<RecordBatch>>,
+}
+
+impl RecordBatchStream for AsyncTestRecordBatchStream {
+    fn schema(&self) -> arrow_schema::SchemaRef {
+        super::create_test_schema()
+    }
+}
+
+impl Stream for AsyncTestRecordBatchStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        _cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let mut this = self.as_mut();
+
+        #[allow(clippy::disallowed_methods)]
+        tokio::spawn(async move {
+            // Nothing to do. We just need to simulate an async
+            // task running
+        });
+
+        if let Err(e) = this.batch_request.try_send(true) {
+            return std::task::Poll::Ready(Some(Err(DataFusionError::Execution(
+                format!("Unable to send batch request, {}", e),
+            ))));
+        }
+
+        match this.batch_receiver.blocking_recv() {
+            Ok(batch) => match batch {
+                Some(batch) => std::task::Poll::Ready(Some(Ok(batch))),
+                None => std::task::Poll::Ready(None),
+            },
+            Err(e) => 
std::task::Poll::Ready(Some(Err(DataFusionError::Execution(
+                format!("Unable receive record batch: {}", e),
+            )))),
+        }
+    }
+}
+
+pub(crate) fn create_async_table_provider() -> FFI_TableProvider {
+    let (table_provider, tokio_rt) = start_async_provider();
+    FFI_TableProvider::new(Arc::new(table_provider), true, Some(tokio_rt))
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
new file mode 100644
index 0000000000..d2e865d6c2
--- /dev/null
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use abi_stable::{
+    declare_root_module_statics, export_root_module,
+    library::{LibraryError, RootModule},
+    package_version_strings,
+    prefix_type::PrefixTypeTrait,
+    sabi_types::VersionStrings,
+    StableAbi,
+};
+
+use super::table_provider::FFI_TableProvider;
+use arrow_array::RecordBatch;
+use async_provider::create_async_table_provider;
+use datafusion::{
+    arrow::datatypes::{DataType, Field, Schema},
+    common::record_batch,
+};
+use sync_provider::create_sync_table_provider;
+
+mod async_provider;
+mod sync_provider;
+
+#[repr(C)]
+#[derive(StableAbi)]
+#[sabi(kind(Prefix(prefix_ref = TableProviderModuleRef)))]
+/// This struct defines the module interfaces. It is to be shared by
+/// both the module loading program and library that implements the
+/// module. It is possible to move this definition into the loading
+/// program and reference it in the modules, but this example shows
+/// how a user may wish to separate these concerns.
+pub struct TableProviderModule {
+    /// Constructs the table provider
+    pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,
+
+    pub version: extern "C" fn() -> u64,
+}
+
+impl RootModule for TableProviderModuleRef {
+    declare_root_module_statics! {TableProviderModuleRef}
+    const BASE_NAME: &'static str = "datafusion_ffi";
+    const NAME: &'static str = "datafusion_ffi";
+    const VERSION_STRINGS: VersionStrings = package_version_strings!();
+
+    fn initialization(self) -> Result<Self, LibraryError> {
+        Ok(self)
+    }
+}
+
+fn create_test_schema() -> Arc<Schema> {
+    Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Int32, true),
+        Field::new("b", DataType::Float64, true),
+    ]))
+}
+
+pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch 
{
+    let end_value = start_value + num_values as i32;
+    let a_vals: Vec<i32> = (start_value..end_value).collect();
+    let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
+
+    record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
+}
+
+/// Here we only wish to create a simple table provider as an example.
+/// We create an in-memory table and convert it to it's FFI counterpart.
+extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider 
{
+    match synchronous {
+        true => create_sync_table_provider(),
+        false => create_async_table_provider(),
+    }
+}
+
+#[export_root_module]
+/// This defines the entry point for using the module.
+pub fn get_simple_memory_table() -> TableProviderModuleRef {
+    TableProviderModule {
+        create_table: construct_table_provider,
+        version: super::version,
+    }
+    .leak_into_prefix()
+}
diff --git a/datafusion/ffi/src/lib.rs 
b/datafusion/ffi/src/tests/sync_provider.rs
similarity index 54%
copy from datafusion/ffi/src/lib.rs
copy to datafusion/ffi/src/tests/sync_provider.rs
index 8e09780edf..ff85e0b15b 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/tests/sync_provider.rs
@@ -15,16 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
-#![deny(clippy::clone_on_ref_ptr)]
+use std::sync::Arc;
 
-pub mod arrow_wrappers;
-pub mod execution_plan;
-pub mod plan_properties;
-pub mod record_batch_stream;
-pub mod session_config;
-pub mod table_provider;
-pub mod table_source;
+use crate::table_provider::FFI_TableProvider;
+use datafusion::datasource::MemTable;
 
-#[cfg(doctest)]
-doc_comment::doctest!("../README.md", readme_example_test);
+use super::{create_record_batch, create_test_schema};
+
+pub(crate) fn create_sync_table_provider() -> FFI_TableProvider {
+    let schema = create_test_schema();
+
+    // It is useful to create these as multiple record batches
+    // so that we can demonstrate the FFI stream.
+    let batches = vec![
+        create_record_batch(1, 5),
+        create_record_batch(6, 1),
+        create_record_batch(7, 5),
+    ];
+
+    let table_provider = MemTable::try_new(schema, vec![batches]).unwrap();
+
+    FFI_TableProvider::new(Arc::new(table_provider), true, None)
+}
diff --git a/datafusion/ffi/tests/table_provider.rs 
b/datafusion/ffi/tests/table_provider.rs
new file mode 100644
index 0000000000..9169c9f422
--- /dev/null
+++ b/datafusion/ffi/tests/table_provider.rs
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// Add an additional module here for convenience to scope this to only
+/// when the feature integtation-tests is built
+#[cfg(feature = "integration-tests")]
+mod tests {
+
+    use abi_stable::library::RootModule;
+    use datafusion::error::{DataFusionError, Result};
+    use datafusion::prelude::SessionContext;
+    use datafusion_ffi::table_provider::ForeignTableProvider;
+    use datafusion_ffi::tests::TableProviderModuleRef;
+    use std::path::Path;
+    use std::sync::Arc;
+
+    /// Compute the path to the library. It would be preferable to simply use
+    /// abi_stable::library::development_utils::compute_library_path however
+    /// our current CI pipeline has a `ci` profile that we need to use to
+    /// find the library.
+    pub fn compute_library_path<M: RootModule>(
+        target_path: &Path,
+    ) -> std::io::Result<std::path::PathBuf> {
+        let debug_dir = target_path.join("debug");
+        let release_dir = target_path.join("release");
+        let ci_dir = target_path.join("ci");
+
+        let debug_path = M::get_library_path(&debug_dir.join("deps"));
+        let release_path = M::get_library_path(&release_dir.join("deps"));
+        let ci_path = M::get_library_path(&ci_dir.join("deps"));
+
+        let all_paths = vec![
+            (debug_dir.clone(), debug_path),
+            (release_dir, release_path),
+            (ci_dir, ci_path),
+        ];
+
+        let best_path = all_paths
+            .into_iter()
+            .filter(|(_, path)| path.exists())
+            .filter_map(|(dir, path)| path.metadata().map(|m| (dir, m)).ok())
+            .filter_map(|(dir, meta)| meta.modified().map(|m| (dir, m)).ok())
+            .max_by_key(|(_, date)| *date)
+            .map(|(dir, _)| dir)
+            .unwrap_or(debug_dir);
+
+        Ok(best_path)
+    }
+
+    /// It is important that this test is in the `tests` directory and not in 
the
+    /// library directory so we can verify we are building a dynamic library 
and
+    /// testing it via a different executable.
+    #[cfg(feature = "integration-tests")]
+    async fn test_table_provider(synchronous: bool) -> Result<()> {
+        let expected_version = datafusion_ffi::version();
+
+        let crate_root = Path::new(env!("CARGO_MANIFEST_DIR"));
+        let target_dir = crate_root
+            .parent()
+            .expect("Failed to find crate parent")
+            .parent()
+            .expect("Failed to find workspace root")
+            .join("target");
+
+        // Find the location of the library. This is specific to the build 
environment,
+        // so you will need to change the approach here based on your use case.
+        // let target: &std::path::Path = "../../../../target/".as_ref();
+        let library_path =
+            
compute_library_path::<TableProviderModuleRef>(target_dir.as_path())
+                .map_err(|e| DataFusionError::External(Box::new(e)))?
+                .join("deps");
+
+        // Load the module
+        let table_provider_module =
+            TableProviderModuleRef::load_from_directory(&library_path)
+                .map_err(|e| DataFusionError::External(Box::new(e)))?;
+
+        assert_eq!(
+            table_provider_module
+                .version()
+                .expect("Unable to call version on FFI module")(),
+            expected_version
+        );
+
+        // By calling the code below, the table provided will be created within
+        // the module's code.
+        let ffi_table_provider = table_provider_module.create_table().ok_or(
+            DataFusionError::NotImplemented(
+                "External table provider failed to implement 
create_table".to_string(),
+            ),
+        )?(synchronous);
+
+        // In order to access the table provider within this executable, we 
need to
+        // turn it into a `ForeignTableProvider`.
+        let foreign_table_provider: ForeignTableProvider = 
(&ffi_table_provider).into();
+
+        let ctx = SessionContext::new();
+
+        // Display the data to show the full cycle works.
+        ctx.register_table("external_table", 
Arc::new(foreign_table_provider))?;
+        let df = ctx.table("external_table").await?;
+        let results = df.collect().await?;
+
+        assert_eq!(results.len(), 3);
+        assert_eq!(results[0], datafusion_ffi::tests::create_record_batch(1, 
5));
+        assert_eq!(results[1], datafusion_ffi::tests::create_record_batch(6, 
1));
+        assert_eq!(results[2], datafusion_ffi::tests::create_record_batch(7, 
5));
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn async_test_table_provider() -> Result<()> {
+        test_table_provider(false).await
+    }
+
+    #[tokio::test]
+    async fn sync_test_table_provider() -> Result<()> {
+        test_table_provider(true).await
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to