This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 72705a31e2 Add CatalogProvider and SchemaProvider to FFI Crate (#15280)
72705a31e2 is described below

commit 72705a31e219458fd152694df816e54be940f32c
Author: Tim Saucer <[email protected]>
AuthorDate: Tue Mar 18 07:19:22 2025 -0400

    Add CatalogProvider and SchemaProvider to FFI Crate (#15280)
    
    * initial commit for schema and catalog providers
    
    * Add unit tests
    
    * Update docstrings
    
    * Add integration test
---
 datafusion/ffi/src/catalog_provider.rs  | 338 ++++++++++++++++++++++++++++
 datafusion/ffi/src/lib.rs               |   2 +
 datafusion/ffi/src/schema_provider.rs   | 385 ++++++++++++++++++++++++++++++++
 datafusion/ffi/src/table_provider.rs    |   2 +-
 datafusion/ffi/src/tests/catalog.rs     | 183 +++++++++++++++
 datafusion/ffi/src/tests/mod.rs         |   8 +
 datafusion/ffi/tests/ffi_integration.rs |  27 +++
 7 files changed, 944 insertions(+), 1 deletion(-)

diff --git a/datafusion/ffi/src/catalog_provider.rs 
b/datafusion/ffi/src/catalog_provider.rs
new file mode 100644
index 0000000000..0886d4749d
--- /dev/null
+++ b/datafusion/ffi/src/catalog_provider.rs
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, ffi::c_void, sync::Arc};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use datafusion::catalog::{CatalogProvider, SchemaProvider};
+use tokio::runtime::Handle;
+
+use crate::{
+    df_result, rresult_return,
+    schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
+};
+
+use datafusion::error::Result;
+
+/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_CatalogProvider {
+    pub schema_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
+
+    pub schema: unsafe extern "C" fn(
+        provider: &Self,
+        name: RString,
+    ) -> ROption<FFI_SchemaProvider>,
+
+    pub register_schema:
+        unsafe extern "C" fn(
+            provider: &Self,
+            name: RString,
+            schema: &FFI_SchemaProvider,
+        ) -> RResult<ROption<FFI_SchemaProvider>, RString>,
+
+    pub deregister_schema:
+        unsafe extern "C" fn(
+            provider: &Self,
+            name: RString,
+            cascade: bool,
+        ) -> RResult<ROption<FFI_SchemaProvider>, RString>,
+
+    /// Used to create a clone on the provider of the execution plan. This 
should
+    /// only need to be called by the receiver of the plan.
+    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(arg: &mut Self),
+
+    /// Return the major DataFusion version number of this provider.
+    pub version: unsafe extern "C" fn() -> u64,
+
+    /// Internal data. This is only to be accessed by the provider of the plan.
+    /// A [`ForeignCatalogProvider`] should never attempt to access this data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_CatalogProvider {}
+unsafe impl Sync for FFI_CatalogProvider {}
+
+struct ProviderPrivateData {
+    provider: Arc<dyn CatalogProvider + Send>,
+    runtime: Option<Handle>,
+}
+
+impl FFI_CatalogProvider {
+    unsafe fn inner(&self) -> &Arc<dyn CatalogProvider + Send> {
+        let private_data = self.private_data as *const ProviderPrivateData;
+        &(*private_data).provider
+    }
+
+    unsafe fn runtime(&self) -> Option<Handle> {
+        let private_data = self.private_data as *const ProviderPrivateData;
+        (*private_data).runtime.clone()
+    }
+}
+
+unsafe extern "C" fn schema_names_fn_wrapper(
+    provider: &FFI_CatalogProvider,
+) -> RVec<RString> {
+    let names = provider.inner().schema_names();
+    names.into_iter().map(|s| s.into()).collect()
+}
+
+unsafe extern "C" fn schema_fn_wrapper(
+    provider: &FFI_CatalogProvider,
+    name: RString,
+) -> ROption<FFI_SchemaProvider> {
+    let maybe_schema = provider.inner().schema(name.as_str());
+    maybe_schema
+        .map(|schema| FFI_SchemaProvider::new(schema, provider.runtime()))
+        .into()
+}
+
+unsafe extern "C" fn register_schema_fn_wrapper(
+    provider: &FFI_CatalogProvider,
+    name: RString,
+    schema: &FFI_SchemaProvider,
+) -> RResult<ROption<FFI_SchemaProvider>, RString> {
+    let runtime = provider.runtime();
+    let provider = provider.inner();
+    let schema = Arc::new(ForeignSchemaProvider::from(schema));
+
+    let returned_schema =
+        rresult_return!(provider.register_schema(name.as_str(), schema))
+            .map(|schema| FFI_SchemaProvider::new(schema, runtime))
+            .into();
+
+    RResult::ROk(returned_schema)
+}
+
+unsafe extern "C" fn deregister_schema_fn_wrapper(
+    provider: &FFI_CatalogProvider,
+    name: RString,
+    cascade: bool,
+) -> RResult<ROption<FFI_SchemaProvider>, RString> {
+    let runtime = provider.runtime();
+    let provider = provider.inner();
+
+    let maybe_schema =
+        rresult_return!(provider.deregister_schema(name.as_str(), cascade));
+
+    RResult::ROk(
+        maybe_schema
+            .map(|schema| FFI_SchemaProvider::new(schema, runtime))
+            .into(),
+    )
+}
+
+unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) {
+    let private_data = Box::from_raw(provider.private_data as *mut 
ProviderPrivateData);
+    drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+    provider: &FFI_CatalogProvider,
+) -> FFI_CatalogProvider {
+    let old_private_data = provider.private_data as *const ProviderPrivateData;
+    let runtime = (*old_private_data).runtime.clone();
+
+    let private_data = Box::into_raw(Box::new(ProviderPrivateData {
+        provider: Arc::clone(&(*old_private_data).provider),
+        runtime,
+    })) as *mut c_void;
+
+    FFI_CatalogProvider {
+        schema_names: schema_names_fn_wrapper,
+        schema: schema_fn_wrapper,
+        register_schema: register_schema_fn_wrapper,
+        deregister_schema: deregister_schema_fn_wrapper,
+        clone: clone_fn_wrapper,
+        release: release_fn_wrapper,
+        version: super::version,
+        private_data,
+    }
+}
+
+impl Drop for FFI_CatalogProvider {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+impl FFI_CatalogProvider {
+    /// Creates a new [`FFI_CatalogProvider`].
+    pub fn new(
+        provider: Arc<dyn CatalogProvider + Send>,
+        runtime: Option<Handle>,
+    ) -> Self {
+        let private_data = Box::new(ProviderPrivateData { provider, runtime });
+
+        Self {
+            schema_names: schema_names_fn_wrapper,
+            schema: schema_fn_wrapper,
+            register_schema: register_schema_fn_wrapper,
+            deregister_schema: deregister_schema_fn_wrapper,
+            clone: clone_fn_wrapper,
+            release: release_fn_wrapper,
+            version: super::version,
+            private_data: Box::into_raw(private_data) as *mut c_void,
+        }
+    }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so 
it has
+/// no guarantees about being able to access the data in `private_data`. Any 
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_CatalogProvider to interact with the foreign table provider.
+#[derive(Debug)]
+pub struct ForeignCatalogProvider(FFI_CatalogProvider);
+
+unsafe impl Send for ForeignCatalogProvider {}
+unsafe impl Sync for ForeignCatalogProvider {}
+
+impl From<&FFI_CatalogProvider> for ForeignCatalogProvider {
+    fn from(provider: &FFI_CatalogProvider) -> Self {
+        Self(provider.clone())
+    }
+}
+
+impl Clone for FFI_CatalogProvider {
+    fn clone(&self) -> Self {
+        unsafe { (self.clone)(self) }
+    }
+}
+
+impl CatalogProvider for ForeignCatalogProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema_names(&self) -> Vec<String> {
+        unsafe {
+            (self.0.schema_names)(&self.0)
+                .into_iter()
+                .map(|s| s.into())
+                .collect()
+        }
+    }
+
+    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+        unsafe {
+            let maybe_provider: Option<FFI_SchemaProvider> =
+                (self.0.schema)(&self.0, name.into()).into();
+
+            maybe_provider.map(|provider| {
+                Arc::new(ForeignSchemaProvider(provider)) as Arc<dyn 
SchemaProvider>
+            })
+        }
+    }
+
+    fn register_schema(
+        &self,
+        name: &str,
+        schema: Arc<dyn SchemaProvider>,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        unsafe {
+            let schema = match 
schema.as_any().downcast_ref::<ForeignSchemaProvider>() {
+                Some(s) => &s.0,
+                None => &FFI_SchemaProvider::new(schema, None),
+            };
+            let returned_schema: Option<FFI_SchemaProvider> =
+                df_result!((self.0.register_schema)(&self.0, name.into(), 
schema))?
+                    .into();
+
+            Ok(returned_schema
+                .map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc<dyn 
SchemaProvider>))
+        }
+    }
+
+    fn deregister_schema(
+        &self,
+        name: &str,
+        cascade: bool,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        unsafe {
+            let returned_schema: Option<FFI_SchemaProvider> =
+                df_result!((self.0.deregister_schema)(&self.0, name.into(), 
cascade))?
+                    .into();
+
+            Ok(returned_schema
+                .map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc<dyn 
SchemaProvider>))
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
+
+    use super::*;
+
+    #[test]
+    fn test_round_trip_ffi_catalog_provider() {
+        let prior_schema = Arc::new(MemorySchemaProvider::new());
+
+        let catalog = Arc::new(MemoryCatalogProvider::new());
+        assert!(catalog
+            .as_ref()
+            .register_schema("prior_schema", prior_schema)
+            .unwrap()
+            .is_none());
+
+        let ffi_catalog = FFI_CatalogProvider::new(catalog, None);
+
+        let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
+
+        let prior_schema_names = foreign_catalog.schema_names();
+        assert_eq!(prior_schema_names.len(), 1);
+        assert_eq!(prior_schema_names[0], "prior_schema");
+
+        // Replace an existing schema with one of the same name
+        let returned_schema = foreign_catalog
+            .register_schema("prior_schema", 
Arc::new(MemorySchemaProvider::new()))
+            .expect("Unable to register schema");
+        assert!(returned_schema.is_some());
+        assert_eq!(foreign_catalog.schema_names().len(), 1);
+
+        // Add a new schema name
+        let returned_schema = foreign_catalog
+            .register_schema("second_schema", 
Arc::new(MemorySchemaProvider::new()))
+            .expect("Unable to register schema");
+        assert!(returned_schema.is_none());
+        assert_eq!(foreign_catalog.schema_names().len(), 2);
+
+        // Remove a schema
+        let returned_schema = foreign_catalog
+            .deregister_schema("prior_schema", false)
+            .expect("Unable to deregister schema");
+        assert!(returned_schema.is_some());
+        assert_eq!(foreign_catalog.schema_names().len(), 1);
+
+        // Retrieve non-existant schema
+        let returned_schema = foreign_catalog.schema("prior_schema");
+        assert!(returned_schema.is_none());
+
+        // Retrieve valid schema
+        let returned_schema = foreign_catalog.schema("second_schema");
+        assert!(returned_schema.is_some());
+    }
+}
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 246e93e026..877129fc5b 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -25,10 +25,12 @@
 #![deny(clippy::clone_on_ref_ptr)]
 
 pub mod arrow_wrappers;
+pub mod catalog_provider;
 pub mod execution_plan;
 pub mod insert_op;
 pub mod plan_properties;
 pub mod record_batch_stream;
+pub mod schema_provider;
 pub mod session_config;
 pub mod table_provider;
 pub mod table_source;
diff --git a/datafusion/ffi/src/schema_provider.rs 
b/datafusion/ffi/src/schema_provider.rs
new file mode 100644
index 0000000000..6e5a590e1a
--- /dev/null
+++ b/datafusion/ffi/src/schema_provider.rs
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, ffi::c_void, sync::Arc};
+
+use abi_stable::{
+    std_types::{ROption, RResult, RString, RVec},
+    StableAbi,
+};
+use async_ffi::{FfiFuture, FutureExt};
+use async_trait::async_trait;
+use datafusion::{
+    catalog::{SchemaProvider, TableProvider},
+    error::DataFusionError,
+};
+use tokio::runtime::Handle;
+
+use crate::{
+    df_result, rresult_return,
+    table_provider::{FFI_TableProvider, ForeignTableProvider},
+};
+
+use datafusion::error::Result;
+
+/// A stable struct for sharing [`SchemaProvider`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_SchemaProvider {
+    pub owner_name: ROption<RString>,
+
+    pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
+
+    pub table: unsafe extern "C" fn(
+        provider: &Self,
+        name: RString,
+    ) -> FfiFuture<
+        RResult<ROption<FFI_TableProvider>, RString>,
+    >,
+
+    pub register_table:
+        unsafe extern "C" fn(
+            provider: &Self,
+            name: RString,
+            table: FFI_TableProvider,
+        ) -> RResult<ROption<FFI_TableProvider>, RString>,
+
+    pub deregister_table:
+        unsafe extern "C" fn(
+            provider: &Self,
+            name: RString,
+        ) -> RResult<ROption<FFI_TableProvider>, RString>,
+
+    pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) -> 
bool,
+
+    /// Used to create a clone on the provider of the execution plan. This 
should
+    /// only need to be called by the receiver of the plan.
+    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
+
+    /// Release the memory of the private data when it is no longer being used.
+    pub release: unsafe extern "C" fn(arg: &mut Self),
+
+    /// Return the major DataFusion version number of this provider.
+    pub version: unsafe extern "C" fn() -> u64,
+
+    /// Internal data. This is only to be accessed by the provider of the plan.
+    /// A [`ForeignSchemaProvider`] should never attempt to access this data.
+    pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_SchemaProvider {}
+unsafe impl Sync for FFI_SchemaProvider {}
+
+struct ProviderPrivateData {
+    provider: Arc<dyn SchemaProvider + Send>,
+    runtime: Option<Handle>,
+}
+
+impl FFI_SchemaProvider {
+    unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
+        let private_data = self.private_data as *const ProviderPrivateData;
+        &(*private_data).provider
+    }
+
+    unsafe fn runtime(&self) -> Option<Handle> {
+        let private_data = self.private_data as *const ProviderPrivateData;
+        (*private_data).runtime.clone()
+    }
+}
+
+unsafe extern "C" fn table_names_fn_wrapper(
+    provider: &FFI_SchemaProvider,
+) -> RVec<RString> {
+    let provider = provider.inner();
+
+    let table_names = provider.table_names();
+    table_names.into_iter().map(|s| s.into()).collect()
+}
+
+unsafe extern "C" fn table_fn_wrapper(
+    provider: &FFI_SchemaProvider,
+    name: RString,
+) -> FfiFuture<RResult<ROption<FFI_TableProvider>, RString>> {
+    let runtime = provider.runtime();
+    let provider = Arc::clone(provider.inner());
+
+    async move {
+        let table = rresult_return!(provider.table(name.as_str()).await)
+            .map(|t| FFI_TableProvider::new(t, true, runtime))
+            .into();
+
+        RResult::ROk(table)
+    }
+    .into_ffi()
+}
+
+unsafe extern "C" fn register_table_fn_wrapper(
+    provider: &FFI_SchemaProvider,
+    name: RString,
+    table: FFI_TableProvider,
+) -> RResult<ROption<FFI_TableProvider>, RString> {
+    let runtime = provider.runtime();
+    let provider = provider.inner();
+
+    let table = Arc::new(ForeignTableProvider(table));
+
+    let returned_table = rresult_return!(provider.register_table(name.into(), 
table))
+        .map(|t| FFI_TableProvider::new(t, true, runtime));
+
+    RResult::ROk(returned_table.into())
+}
+
+unsafe extern "C" fn deregister_table_fn_wrapper(
+    provider: &FFI_SchemaProvider,
+    name: RString,
+) -> RResult<ROption<FFI_TableProvider>, RString> {
+    let runtime = provider.runtime();
+    let provider = provider.inner();
+
+    let returned_table = 
rresult_return!(provider.deregister_table(name.as_str()))
+        .map(|t| FFI_TableProvider::new(t, true, runtime));
+
+    RResult::ROk(returned_table.into())
+}
+
+unsafe extern "C" fn table_exist_fn_wrapper(
+    provider: &FFI_SchemaProvider,
+    name: RString,
+) -> bool {
+    provider.inner().table_exist(name.as_str())
+}
+
+unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
+    let private_data = Box::from_raw(provider.private_data as *mut 
ProviderPrivateData);
+    drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+    provider: &FFI_SchemaProvider,
+) -> FFI_SchemaProvider {
+    let old_private_data = provider.private_data as *const ProviderPrivateData;
+    let runtime = (*old_private_data).runtime.clone();
+
+    let private_data = Box::into_raw(Box::new(ProviderPrivateData {
+        provider: Arc::clone(&(*old_private_data).provider),
+        runtime,
+    })) as *mut c_void;
+
+    FFI_SchemaProvider {
+        owner_name: provider.owner_name.clone(),
+        table_names: table_names_fn_wrapper,
+        clone: clone_fn_wrapper,
+        release: release_fn_wrapper,
+        version: super::version,
+        private_data,
+        table: table_fn_wrapper,
+        register_table: register_table_fn_wrapper,
+        deregister_table: deregister_table_fn_wrapper,
+        table_exist: table_exist_fn_wrapper,
+    }
+}
+
+impl Drop for FFI_SchemaProvider {
+    fn drop(&mut self) {
+        unsafe { (self.release)(self) }
+    }
+}
+
+impl FFI_SchemaProvider {
+    /// Creates a new [`FFI_SchemaProvider`].
+    pub fn new(
+        provider: Arc<dyn SchemaProvider + Send>,
+        runtime: Option<Handle>,
+    ) -> Self {
+        let owner_name = provider.owner_name().map(|s| s.into()).into();
+        let private_data = Box::new(ProviderPrivateData { provider, runtime });
+
+        Self {
+            owner_name,
+            table_names: table_names_fn_wrapper,
+            clone: clone_fn_wrapper,
+            release: release_fn_wrapper,
+            version: super::version,
+            private_data: Box::into_raw(private_data) as *mut c_void,
+            table: table_fn_wrapper,
+            register_table: register_table_fn_wrapper,
+            deregister_table: deregister_table_fn_wrapper,
+            table_exist: table_exist_fn_wrapper,
+        }
+    }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so 
it has
+/// no guarantees about being able to access the data in `private_data`. Any 
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_SchemaProvider to interact with the foreign table provider.
+#[derive(Debug)]
+pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
+
+unsafe impl Send for ForeignSchemaProvider {}
+unsafe impl Sync for ForeignSchemaProvider {}
+
+impl From<&FFI_SchemaProvider> for ForeignSchemaProvider {
+    fn from(provider: &FFI_SchemaProvider) -> Self {
+        Self(provider.clone())
+    }
+}
+
+impl Clone for FFI_SchemaProvider {
+    fn clone(&self) -> Self {
+        unsafe { (self.clone)(self) }
+    }
+}
+
+#[async_trait]
+impl SchemaProvider for ForeignSchemaProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn owner_name(&self) -> Option<&str> {
+        let name: Option<&RString> = self.0.owner_name.as_ref().into();
+        name.map(|s| s.as_str())
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        unsafe {
+            (self.0.table_names)(&self.0)
+                .into_iter()
+                .map(|s| s.into())
+                .collect()
+        }
+    }
+
+    async fn table(
+        &self,
+        name: &str,
+    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+        unsafe {
+            let table: Option<FFI_TableProvider> =
+                df_result!((self.0.table)(&self.0, name.into()).await)?.into();
+
+            let table = table.as_ref().map(|t| {
+                Arc::new(ForeignTableProvider::from(t)) as Arc<dyn 
TableProvider>
+            });
+
+            Ok(table)
+        }
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        unsafe {
+            let ffi_table = match 
table.as_any().downcast_ref::<ForeignTableProvider>() {
+                Some(t) => t.0.clone(),
+                None => FFI_TableProvider::new(table, true, None),
+            };
+
+            let returned_provider: Option<FFI_TableProvider> =
+                df_result!((self.0.register_table)(&self.0, name.into(), 
ffi_table))?
+                    .into();
+
+            Ok(returned_provider
+                .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn 
TableProvider>))
+        }
+    }
+
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        let returned_provider: Option<FFI_TableProvider> = unsafe {
+            df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
+        };
+
+        Ok(returned_provider
+            .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn 
TableProvider>))
+    }
+
+    /// Returns true if table exist in the schema provider, false otherwise.
+    fn table_exist(&self, name: &str) -> bool {
+        unsafe { (self.0.table_exist)(&self.0, name.into()) }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use arrow::datatypes::Schema;
+    use datafusion::{catalog::MemorySchemaProvider, 
datasource::empty::EmptyTable};
+
+    use super::*;
+
+    fn empty_table() -> Arc<dyn TableProvider> {
+        Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
+    }
+
+    #[tokio::test]
+    async fn test_round_trip_ffi_schema_provider() {
+        let schema_provider = Arc::new(MemorySchemaProvider::new());
+        assert!(schema_provider
+            .as_ref()
+            .register_table("prior_table".to_string(), empty_table())
+            .unwrap()
+            .is_none());
+
+        let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider, 
None);
+
+        let foreign_schema_provider: ForeignSchemaProvider =
+            (&ffi_schema_provider).into();
+
+        let prior_table_names = foreign_schema_provider.table_names();
+        assert_eq!(prior_table_names.len(), 1);
+        assert_eq!(prior_table_names[0], "prior_table");
+
+        // Replace an existing table with one of the same name generates an 
error
+        let returned_schema = foreign_schema_provider
+            .register_table("prior_table".to_string(), empty_table());
+        assert!(returned_schema.is_err());
+        assert_eq!(foreign_schema_provider.table_names().len(), 1);
+
+        // Add a new table
+        let returned_schema = foreign_schema_provider
+            .register_table("second_table".to_string(), empty_table())
+            .expect("Unable to register table");
+        assert!(returned_schema.is_none());
+        assert_eq!(foreign_schema_provider.table_names().len(), 2);
+
+        // Remove a table
+        let returned_schema = foreign_schema_provider
+            .deregister_table("prior_table")
+            .expect("Unable to deregister table");
+        assert!(returned_schema.is_some());
+        assert_eq!(foreign_schema_provider.table_names().len(), 1);
+
+        // Retrieve non-existant table
+        let returned_schema = foreign_schema_provider
+            .table("prior_table")
+            .await
+            .expect("Unable to query table");
+        assert!(returned_schema.is_none());
+        assert!(!foreign_schema_provider.table_exist("prior_table"));
+
+        // Retrieve valid table
+        let returned_schema = foreign_schema_provider
+            .table("second_table")
+            .await
+            .expect("Unable to query table");
+        assert!(returned_schema.is_some());
+        assert!(foreign_schema_provider.table_exist("second_table"));
+    }
+}
diff --git a/datafusion/ffi/src/table_provider.rs 
b/datafusion/ffi/src/table_provider.rs
index 0b4080abcb..a7391a8503 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -382,7 +382,7 @@ impl FFI_TableProvider {
 /// defined on this struct must only use the stable functions provided in
 /// FFI_TableProvider to interact with the foreign table provider.
 #[derive(Debug)]
-pub struct ForeignTableProvider(FFI_TableProvider);
+pub struct ForeignTableProvider(pub FFI_TableProvider);
 
 unsafe impl Send for ForeignTableProvider {}
 unsafe impl Sync for ForeignTableProvider {}
diff --git a/datafusion/ffi/src/tests/catalog.rs 
b/datafusion/ffi/src/tests/catalog.rs
new file mode 100644
index 0000000000..f4293adb41
--- /dev/null
+++ b/datafusion/ffi/src/tests/catalog.rs
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This is an example of an async table provider that will call functions on
+//! the tokio runtime of the library providing the function. Since we cannot
+//! share a tokio runtime across the ffi boundary and the producer and consumer
+//! may have different runtimes, we need to store a reference to the runtime
+//! and enter it during streaming calls. The entering of the runtime will
+//! occur by the datafusion_ffi crate during the streaming calls. This code
+//! serves as an integration test of this feature. If we do not correctly
+//! access the runtime, then you will get a panic when trying to do operations
+//! such as spawning a tokio task.
+
+use std::{any::Any, fmt::Debug, sync::Arc};
+
+use crate::catalog_provider::FFI_CatalogProvider;
+use arrow::datatypes::Schema;
+use async_trait::async_trait;
+use datafusion::{
+    catalog::{
+        CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider, 
SchemaProvider,
+        TableProvider,
+    },
+    common::exec_err,
+    datasource::MemTable,
+    error::{DataFusionError, Result},
+};
+
+/// This schema provider is intended only for unit tests. It prepopulates with 
one
+/// table and only allows for tables named sales and purchases.
+#[derive(Debug)]
+pub struct FixedSchemaProvider {
+    inner: MemorySchemaProvider,
+}
+
+pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
+    use arrow::datatypes::{DataType, Field};
+    use datafusion::common::record_batch;
+
+    let schema = Arc::new(Schema::new(vec![
+        Field::new("units", DataType::Int32, true),
+        Field::new("price", DataType::Float64, true),
+    ]));
+
+    let partitions = vec![
+        record_batch!(
+            ("units", Int32, vec![10, 20, 30]),
+            ("price", Float64, vec![1.0, 2.0, 5.0])
+        )
+        .unwrap(),
+        record_batch!(
+            ("units", Int32, vec![5, 7]),
+            ("price", Float64, vec![1.5, 2.5])
+        )
+        .unwrap(),
+    ];
+
+    Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
+}
+
+impl Default for FixedSchemaProvider {
+    fn default() -> Self {
+        let inner = MemorySchemaProvider::new();
+
+        let table = fruit_table();
+
+        let _ = inner
+            .register_table("purchases".to_string(), table)
+            .unwrap();
+
+        Self { inner }
+    }
+}
+
+#[async_trait]
+impl SchemaProvider for FixedSchemaProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.inner.table_names()
+    }
+
+    async fn table(
+        &self,
+        name: &str,
+    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+        self.inner.table(name).await
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        self.inner.table_exist(name)
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        if name.as_str() != "sales" && name.as_str() != "purchases" {
+            return exec_err!(
+                "FixedSchemaProvider only provides two tables: sales and 
purchases"
+            );
+        }
+
+        self.inner.register_table(name, table)
+    }
+
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        self.inner.deregister_table(name)
+    }
+}
+
+/// This catalog provider is intended only for unit tests. It prepopulates 
with one
+/// schema and only allows for schemas named after four types of fruit.
+#[derive(Debug)]
+pub struct FixedCatalogProvider {
+    inner: MemoryCatalogProvider,
+}
+
+impl Default for FixedCatalogProvider {
+    fn default() -> Self {
+        let inner = MemoryCatalogProvider::new();
+
+        let _ = inner.register_schema("apple", 
Arc::new(FixedSchemaProvider::default()));
+
+        Self { inner }
+    }
+}
+
+impl CatalogProvider for FixedCatalogProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema_names(&self) -> Vec<String> {
+        self.inner.schema_names()
+    }
+
+    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+        self.inner.schema(name)
+    }
+
+    fn register_schema(
+        &self,
+        name: &str,
+        schema: Arc<dyn SchemaProvider>,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        if !["apple", "banana", "cherry", "date"].contains(&name) {
+            return exec_err!("FixedCatalogProvider only provides four schemas: 
apple, banana, cherry, date");
+        }
+
+        self.inner.register_schema(name, schema)
+    }
+
+    fn deregister_schema(
+        &self,
+        name: &str,
+        cascade: bool,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        self.inner.deregister_schema(name, cascade)
+    }
+}
+
+pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider {
+    let catalog_provider = Arc::new(FixedCatalogProvider::default());
+    FFI_CatalogProvider::new(catalog_provider, None)
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 5a471cb8fe..4b4a29276d 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -25,6 +25,9 @@ use abi_stable::{
     sabi_types::VersionStrings,
     StableAbi,
 };
+use catalog::create_catalog_provider;
+
+use crate::catalog_provider::FFI_CatalogProvider;
 
 use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
 use arrow::array::RecordBatch;
@@ -37,6 +40,7 @@ use sync_provider::create_sync_table_provider;
 use udf_udaf_udwf::create_ffi_abs_func;
 
 mod async_provider;
+pub mod catalog;
 mod sync_provider;
 mod udf_udaf_udwf;
 
@@ -47,6 +51,9 @@ mod udf_udaf_udwf;
 /// both the module loading program and library that implements the
 /// module.
 pub struct ForeignLibraryModule {
+    /// Construct an opinionated catalog provider
+    pub create_catalog: extern "C" fn() -> FFI_CatalogProvider,
+
     /// Constructs the table provider
     pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,
 
@@ -95,6 +102,7 @@ extern "C" fn construct_table_provider(synchronous: bool) -> 
FFI_TableProvider {
 /// This defines the entry point for using the module.
 pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
     ForeignLibraryModule {
+        create_catalog: create_catalog_provider,
         create_table: construct_table_provider,
         create_scalar_udf: create_ffi_abs_func,
         version: super::version,
diff --git a/datafusion/ffi/tests/ffi_integration.rs 
b/datafusion/ffi/tests/ffi_integration.rs
index 84e120df42..f610f12c82 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -25,6 +25,7 @@ mod tests {
     use datafusion::error::{DataFusionError, Result};
     use datafusion::logical_expr::ScalarUDF;
     use datafusion::prelude::{col, SessionContext};
+    use datafusion_ffi::catalog_provider::ForeignCatalogProvider;
     use datafusion_ffi::table_provider::ForeignTableProvider;
     use datafusion_ffi::tests::{create_record_batch, ForeignLibraryModuleRef};
     use datafusion_ffi::udf::ForeignScalarUDF;
@@ -179,4 +180,30 @@ mod tests {
 
         Ok(())
     }
+
+    #[tokio::test]
+    async fn test_catalog() -> Result<()> {
+        let module = get_module()?;
+
+        let ffi_catalog =
+            module
+                .create_catalog()
+                .ok_or(DataFusionError::NotImplemented(
+                    "External catalog provider failed to implement 
create_catalog"
+                        .to_string(),
+                ))?();
+        let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
+
+        let ctx = SessionContext::default();
+        let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog));
+
+        let df = ctx.table("fruit.apple.purchases").await?;
+
+        let results = df.collect().await?;
+
+        assert!(!results.is_empty());
+        assert!(results[0].num_rows() != 0);
+
+        Ok(())
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to