This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 72705a31e2 Add CatalogProvider and SchemaProvider to FFI Crate (#15280)
72705a31e2 is described below
commit 72705a31e219458fd152694df816e54be940f32c
Author: Tim Saucer <[email protected]>
AuthorDate: Tue Mar 18 07:19:22 2025 -0400
Add CatalogProvider and SchemaProvider to FFI Crate (#15280)
* initial commit for schema and catalog providers
* Add unit tests
* Update docstrings
* Add integration test
---
datafusion/ffi/src/catalog_provider.rs | 338 ++++++++++++++++++++++++++++
datafusion/ffi/src/lib.rs | 2 +
datafusion/ffi/src/schema_provider.rs | 385 ++++++++++++++++++++++++++++++++
datafusion/ffi/src/table_provider.rs | 2 +-
datafusion/ffi/src/tests/catalog.rs | 183 +++++++++++++++
datafusion/ffi/src/tests/mod.rs | 8 +
datafusion/ffi/tests/ffi_integration.rs | 27 +++
7 files changed, 944 insertions(+), 1 deletion(-)
diff --git a/datafusion/ffi/src/catalog_provider.rs
b/datafusion/ffi/src/catalog_provider.rs
new file mode 100644
index 0000000000..0886d4749d
--- /dev/null
+++ b/datafusion/ffi/src/catalog_provider.rs
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, ffi::c_void, sync::Arc};
+
+use abi_stable::{
+ std_types::{ROption, RResult, RString, RVec},
+ StableAbi,
+};
+use datafusion::catalog::{CatalogProvider, SchemaProvider};
+use tokio::runtime::Handle;
+
+use crate::{
+ df_result, rresult_return,
+ schema_provider::{FFI_SchemaProvider, ForeignSchemaProvider},
+};
+
+use datafusion::error::Result;
+
+/// A stable struct for sharing [`CatalogProvider`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_CatalogProvider {
+ pub schema_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
+
+ pub schema: unsafe extern "C" fn(
+ provider: &Self,
+ name: RString,
+ ) -> ROption<FFI_SchemaProvider>,
+
+ pub register_schema:
+ unsafe extern "C" fn(
+ provider: &Self,
+ name: RString,
+ schema: &FFI_SchemaProvider,
+ ) -> RResult<ROption<FFI_SchemaProvider>, RString>,
+
+ pub deregister_schema:
+ unsafe extern "C" fn(
+ provider: &Self,
+ name: RString,
+ cascade: bool,
+ ) -> RResult<ROption<FFI_SchemaProvider>, RString>,
+
+ /// Used to create a clone on the provider of the execution plan. This
should
+ /// only need to be called by the receiver of the plan.
+ pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
+
+ /// Release the memory of the private data when it is no longer being used.
+ pub release: unsafe extern "C" fn(arg: &mut Self),
+
+ /// Return the major DataFusion version number of this provider.
+ pub version: unsafe extern "C" fn() -> u64,
+
+ /// Internal data. This is only to be accessed by the provider of the plan.
+ /// A [`ForeignCatalogProvider`] should never attempt to access this data.
+ pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_CatalogProvider {}
+unsafe impl Sync for FFI_CatalogProvider {}
+
+struct ProviderPrivateData {
+ provider: Arc<dyn CatalogProvider + Send>,
+ runtime: Option<Handle>,
+}
+
+impl FFI_CatalogProvider {
+ unsafe fn inner(&self) -> &Arc<dyn CatalogProvider + Send> {
+ let private_data = self.private_data as *const ProviderPrivateData;
+ &(*private_data).provider
+ }
+
+ unsafe fn runtime(&self) -> Option<Handle> {
+ let private_data = self.private_data as *const ProviderPrivateData;
+ (*private_data).runtime.clone()
+ }
+}
+
+unsafe extern "C" fn schema_names_fn_wrapper(
+ provider: &FFI_CatalogProvider,
+) -> RVec<RString> {
+ let names = provider.inner().schema_names();
+ names.into_iter().map(|s| s.into()).collect()
+}
+
+unsafe extern "C" fn schema_fn_wrapper(
+ provider: &FFI_CatalogProvider,
+ name: RString,
+) -> ROption<FFI_SchemaProvider> {
+ let maybe_schema = provider.inner().schema(name.as_str());
+ maybe_schema
+ .map(|schema| FFI_SchemaProvider::new(schema, provider.runtime()))
+ .into()
+}
+
+unsafe extern "C" fn register_schema_fn_wrapper(
+ provider: &FFI_CatalogProvider,
+ name: RString,
+ schema: &FFI_SchemaProvider,
+) -> RResult<ROption<FFI_SchemaProvider>, RString> {
+ let runtime = provider.runtime();
+ let provider = provider.inner();
+ let schema = Arc::new(ForeignSchemaProvider::from(schema));
+
+ let returned_schema =
+ rresult_return!(provider.register_schema(name.as_str(), schema))
+ .map(|schema| FFI_SchemaProvider::new(schema, runtime))
+ .into();
+
+ RResult::ROk(returned_schema)
+}
+
+unsafe extern "C" fn deregister_schema_fn_wrapper(
+ provider: &FFI_CatalogProvider,
+ name: RString,
+ cascade: bool,
+) -> RResult<ROption<FFI_SchemaProvider>, RString> {
+ let runtime = provider.runtime();
+ let provider = provider.inner();
+
+ let maybe_schema =
+ rresult_return!(provider.deregister_schema(name.as_str(), cascade));
+
+ RResult::ROk(
+ maybe_schema
+ .map(|schema| FFI_SchemaProvider::new(schema, runtime))
+ .into(),
+ )
+}
+
+unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_CatalogProvider) {
+ let private_data = Box::from_raw(provider.private_data as *mut
ProviderPrivateData);
+ drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+ provider: &FFI_CatalogProvider,
+) -> FFI_CatalogProvider {
+ let old_private_data = provider.private_data as *const ProviderPrivateData;
+ let runtime = (*old_private_data).runtime.clone();
+
+ let private_data = Box::into_raw(Box::new(ProviderPrivateData {
+ provider: Arc::clone(&(*old_private_data).provider),
+ runtime,
+ })) as *mut c_void;
+
+ FFI_CatalogProvider {
+ schema_names: schema_names_fn_wrapper,
+ schema: schema_fn_wrapper,
+ register_schema: register_schema_fn_wrapper,
+ deregister_schema: deregister_schema_fn_wrapper,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data,
+ }
+}
+
+impl Drop for FFI_CatalogProvider {
+ fn drop(&mut self) {
+ unsafe { (self.release)(self) }
+ }
+}
+
+impl FFI_CatalogProvider {
+ /// Creates a new [`FFI_CatalogProvider`].
+ pub fn new(
+ provider: Arc<dyn CatalogProvider + Send>,
+ runtime: Option<Handle>,
+ ) -> Self {
+ let private_data = Box::new(ProviderPrivateData { provider, runtime });
+
+ Self {
+ schema_names: schema_names_fn_wrapper,
+ schema: schema_fn_wrapper,
+ register_schema: register_schema_fn_wrapper,
+ deregister_schema: deregister_schema_fn_wrapper,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data: Box::into_raw(private_data) as *mut c_void,
+ }
+ }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so
it has
+/// no guarantees about being able to access the data in `private_data`. Any
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_CatalogProvider to interact with the foreign table provider.
+#[derive(Debug)]
+pub struct ForeignCatalogProvider(FFI_CatalogProvider);
+
+unsafe impl Send for ForeignCatalogProvider {}
+unsafe impl Sync for ForeignCatalogProvider {}
+
+impl From<&FFI_CatalogProvider> for ForeignCatalogProvider {
+ fn from(provider: &FFI_CatalogProvider) -> Self {
+ Self(provider.clone())
+ }
+}
+
+impl Clone for FFI_CatalogProvider {
+ fn clone(&self) -> Self {
+ unsafe { (self.clone)(self) }
+ }
+}
+
+impl CatalogProvider for ForeignCatalogProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema_names(&self) -> Vec<String> {
+ unsafe {
+ (self.0.schema_names)(&self.0)
+ .into_iter()
+ .map(|s| s.into())
+ .collect()
+ }
+ }
+
+ fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+ unsafe {
+ let maybe_provider: Option<FFI_SchemaProvider> =
+ (self.0.schema)(&self.0, name.into()).into();
+
+ maybe_provider.map(|provider| {
+ Arc::new(ForeignSchemaProvider(provider)) as Arc<dyn
SchemaProvider>
+ })
+ }
+ }
+
+ fn register_schema(
+ &self,
+ name: &str,
+ schema: Arc<dyn SchemaProvider>,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ unsafe {
+ let schema = match
schema.as_any().downcast_ref::<ForeignSchemaProvider>() {
+ Some(s) => &s.0,
+ None => &FFI_SchemaProvider::new(schema, None),
+ };
+ let returned_schema: Option<FFI_SchemaProvider> =
+ df_result!((self.0.register_schema)(&self.0, name.into(),
schema))?
+ .into();
+
+ Ok(returned_schema
+ .map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc<dyn
SchemaProvider>))
+ }
+ }
+
+ fn deregister_schema(
+ &self,
+ name: &str,
+ cascade: bool,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ unsafe {
+ let returned_schema: Option<FFI_SchemaProvider> =
+ df_result!((self.0.deregister_schema)(&self.0, name.into(),
cascade))?
+ .into();
+
+ Ok(returned_schema
+ .map(|s| Arc::new(ForeignSchemaProvider(s)) as Arc<dyn
SchemaProvider>))
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use datafusion::catalog::{MemoryCatalogProvider, MemorySchemaProvider};
+
+ use super::*;
+
+ #[test]
+ fn test_round_trip_ffi_catalog_provider() {
+ let prior_schema = Arc::new(MemorySchemaProvider::new());
+
+ let catalog = Arc::new(MemoryCatalogProvider::new());
+ assert!(catalog
+ .as_ref()
+ .register_schema("prior_schema", prior_schema)
+ .unwrap()
+ .is_none());
+
+ let ffi_catalog = FFI_CatalogProvider::new(catalog, None);
+
+ let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
+
+ let prior_schema_names = foreign_catalog.schema_names();
+ assert_eq!(prior_schema_names.len(), 1);
+ assert_eq!(prior_schema_names[0], "prior_schema");
+
+ // Replace an existing schema with one of the same name
+ let returned_schema = foreign_catalog
+ .register_schema("prior_schema",
Arc::new(MemorySchemaProvider::new()))
+ .expect("Unable to register schema");
+ assert!(returned_schema.is_some());
+ assert_eq!(foreign_catalog.schema_names().len(), 1);
+
+ // Add a new schema name
+ let returned_schema = foreign_catalog
+ .register_schema("second_schema",
Arc::new(MemorySchemaProvider::new()))
+ .expect("Unable to register schema");
+ assert!(returned_schema.is_none());
+ assert_eq!(foreign_catalog.schema_names().len(), 2);
+
+ // Remove a schema
+ let returned_schema = foreign_catalog
+ .deregister_schema("prior_schema", false)
+ .expect("Unable to deregister schema");
+ assert!(returned_schema.is_some());
+ assert_eq!(foreign_catalog.schema_names().len(), 1);
+
+ // Retrieve non-existant schema
+ let returned_schema = foreign_catalog.schema("prior_schema");
+ assert!(returned_schema.is_none());
+
+ // Retrieve valid schema
+ let returned_schema = foreign_catalog.schema("second_schema");
+ assert!(returned_schema.is_some());
+ }
+}
diff --git a/datafusion/ffi/src/lib.rs b/datafusion/ffi/src/lib.rs
index 246e93e026..877129fc5b 100644
--- a/datafusion/ffi/src/lib.rs
+++ b/datafusion/ffi/src/lib.rs
@@ -25,10 +25,12 @@
#![deny(clippy::clone_on_ref_ptr)]
pub mod arrow_wrappers;
+pub mod catalog_provider;
pub mod execution_plan;
pub mod insert_op;
pub mod plan_properties;
pub mod record_batch_stream;
+pub mod schema_provider;
pub mod session_config;
pub mod table_provider;
pub mod table_source;
diff --git a/datafusion/ffi/src/schema_provider.rs
b/datafusion/ffi/src/schema_provider.rs
new file mode 100644
index 0000000000..6e5a590e1a
--- /dev/null
+++ b/datafusion/ffi/src/schema_provider.rs
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{any::Any, ffi::c_void, sync::Arc};
+
+use abi_stable::{
+ std_types::{ROption, RResult, RString, RVec},
+ StableAbi,
+};
+use async_ffi::{FfiFuture, FutureExt};
+use async_trait::async_trait;
+use datafusion::{
+ catalog::{SchemaProvider, TableProvider},
+ error::DataFusionError,
+};
+use tokio::runtime::Handle;
+
+use crate::{
+ df_result, rresult_return,
+ table_provider::{FFI_TableProvider, ForeignTableProvider},
+};
+
+use datafusion::error::Result;
+
+/// A stable struct for sharing [`SchemaProvider`] across FFI boundaries.
+#[repr(C)]
+#[derive(Debug, StableAbi)]
+#[allow(non_camel_case_types)]
+pub struct FFI_SchemaProvider {
+ pub owner_name: ROption<RString>,
+
+ pub table_names: unsafe extern "C" fn(provider: &Self) -> RVec<RString>,
+
+ pub table: unsafe extern "C" fn(
+ provider: &Self,
+ name: RString,
+ ) -> FfiFuture<
+ RResult<ROption<FFI_TableProvider>, RString>,
+ >,
+
+ pub register_table:
+ unsafe extern "C" fn(
+ provider: &Self,
+ name: RString,
+ table: FFI_TableProvider,
+ ) -> RResult<ROption<FFI_TableProvider>, RString>,
+
+ pub deregister_table:
+ unsafe extern "C" fn(
+ provider: &Self,
+ name: RString,
+ ) -> RResult<ROption<FFI_TableProvider>, RString>,
+
+ pub table_exist: unsafe extern "C" fn(provider: &Self, name: RString) ->
bool,
+
+ /// Used to create a clone on the provider of the execution plan. This
should
+ /// only need to be called by the receiver of the plan.
+ pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
+
+ /// Release the memory of the private data when it is no longer being used.
+ pub release: unsafe extern "C" fn(arg: &mut Self),
+
+ /// Return the major DataFusion version number of this provider.
+ pub version: unsafe extern "C" fn() -> u64,
+
+ /// Internal data. This is only to be accessed by the provider of the plan.
+ /// A [`ForeignSchemaProvider`] should never attempt to access this data.
+ pub private_data: *mut c_void,
+}
+
+unsafe impl Send for FFI_SchemaProvider {}
+unsafe impl Sync for FFI_SchemaProvider {}
+
+struct ProviderPrivateData {
+ provider: Arc<dyn SchemaProvider + Send>,
+ runtime: Option<Handle>,
+}
+
+impl FFI_SchemaProvider {
+ unsafe fn inner(&self) -> &Arc<dyn SchemaProvider + Send> {
+ let private_data = self.private_data as *const ProviderPrivateData;
+ &(*private_data).provider
+ }
+
+ unsafe fn runtime(&self) -> Option<Handle> {
+ let private_data = self.private_data as *const ProviderPrivateData;
+ (*private_data).runtime.clone()
+ }
+}
+
+unsafe extern "C" fn table_names_fn_wrapper(
+ provider: &FFI_SchemaProvider,
+) -> RVec<RString> {
+ let provider = provider.inner();
+
+ let table_names = provider.table_names();
+ table_names.into_iter().map(|s| s.into()).collect()
+}
+
+unsafe extern "C" fn table_fn_wrapper(
+ provider: &FFI_SchemaProvider,
+ name: RString,
+) -> FfiFuture<RResult<ROption<FFI_TableProvider>, RString>> {
+ let runtime = provider.runtime();
+ let provider = Arc::clone(provider.inner());
+
+ async move {
+ let table = rresult_return!(provider.table(name.as_str()).await)
+ .map(|t| FFI_TableProvider::new(t, true, runtime))
+ .into();
+
+ RResult::ROk(table)
+ }
+ .into_ffi()
+}
+
+unsafe extern "C" fn register_table_fn_wrapper(
+ provider: &FFI_SchemaProvider,
+ name: RString,
+ table: FFI_TableProvider,
+) -> RResult<ROption<FFI_TableProvider>, RString> {
+ let runtime = provider.runtime();
+ let provider = provider.inner();
+
+ let table = Arc::new(ForeignTableProvider(table));
+
+ let returned_table = rresult_return!(provider.register_table(name.into(),
table))
+ .map(|t| FFI_TableProvider::new(t, true, runtime));
+
+ RResult::ROk(returned_table.into())
+}
+
+unsafe extern "C" fn deregister_table_fn_wrapper(
+ provider: &FFI_SchemaProvider,
+ name: RString,
+) -> RResult<ROption<FFI_TableProvider>, RString> {
+ let runtime = provider.runtime();
+ let provider = provider.inner();
+
+ let returned_table =
rresult_return!(provider.deregister_table(name.as_str()))
+ .map(|t| FFI_TableProvider::new(t, true, runtime));
+
+ RResult::ROk(returned_table.into())
+}
+
+unsafe extern "C" fn table_exist_fn_wrapper(
+ provider: &FFI_SchemaProvider,
+ name: RString,
+) -> bool {
+ provider.inner().table_exist(name.as_str())
+}
+
+unsafe extern "C" fn release_fn_wrapper(provider: &mut FFI_SchemaProvider) {
+ let private_data = Box::from_raw(provider.private_data as *mut
ProviderPrivateData);
+ drop(private_data);
+}
+
+unsafe extern "C" fn clone_fn_wrapper(
+ provider: &FFI_SchemaProvider,
+) -> FFI_SchemaProvider {
+ let old_private_data = provider.private_data as *const ProviderPrivateData;
+ let runtime = (*old_private_data).runtime.clone();
+
+ let private_data = Box::into_raw(Box::new(ProviderPrivateData {
+ provider: Arc::clone(&(*old_private_data).provider),
+ runtime,
+ })) as *mut c_void;
+
+ FFI_SchemaProvider {
+ owner_name: provider.owner_name.clone(),
+ table_names: table_names_fn_wrapper,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data,
+ table: table_fn_wrapper,
+ register_table: register_table_fn_wrapper,
+ deregister_table: deregister_table_fn_wrapper,
+ table_exist: table_exist_fn_wrapper,
+ }
+}
+
+impl Drop for FFI_SchemaProvider {
+ fn drop(&mut self) {
+ unsafe { (self.release)(self) }
+ }
+}
+
+impl FFI_SchemaProvider {
+ /// Creates a new [`FFI_SchemaProvider`].
+ pub fn new(
+ provider: Arc<dyn SchemaProvider + Send>,
+ runtime: Option<Handle>,
+ ) -> Self {
+ let owner_name = provider.owner_name().map(|s| s.into()).into();
+ let private_data = Box::new(ProviderPrivateData { provider, runtime });
+
+ Self {
+ owner_name,
+ table_names: table_names_fn_wrapper,
+ clone: clone_fn_wrapper,
+ release: release_fn_wrapper,
+ version: super::version,
+ private_data: Box::into_raw(private_data) as *mut c_void,
+ table: table_fn_wrapper,
+ register_table: register_table_fn_wrapper,
+ deregister_table: deregister_table_fn_wrapper,
+ table_exist: table_exist_fn_wrapper,
+ }
+ }
+}
+
+/// This wrapper struct exists on the receiver side of the FFI interface, so
it has
+/// no guarantees about being able to access the data in `private_data`. Any
functions
+/// defined on this struct must only use the stable functions provided in
+/// FFI_SchemaProvider to interact with the foreign table provider.
+#[derive(Debug)]
+pub struct ForeignSchemaProvider(pub FFI_SchemaProvider);
+
+unsafe impl Send for ForeignSchemaProvider {}
+unsafe impl Sync for ForeignSchemaProvider {}
+
+impl From<&FFI_SchemaProvider> for ForeignSchemaProvider {
+ fn from(provider: &FFI_SchemaProvider) -> Self {
+ Self(provider.clone())
+ }
+}
+
+impl Clone for FFI_SchemaProvider {
+ fn clone(&self) -> Self {
+ unsafe { (self.clone)(self) }
+ }
+}
+
+#[async_trait]
+impl SchemaProvider for ForeignSchemaProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn owner_name(&self) -> Option<&str> {
+ let name: Option<&RString> = self.0.owner_name.as_ref().into();
+ name.map(|s| s.as_str())
+ }
+
+ fn table_names(&self) -> Vec<String> {
+ unsafe {
+ (self.0.table_names)(&self.0)
+ .into_iter()
+ .map(|s| s.into())
+ .collect()
+ }
+ }
+
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+ unsafe {
+ let table: Option<FFI_TableProvider> =
+ df_result!((self.0.table)(&self.0, name.into()).await)?.into();
+
+ let table = table.as_ref().map(|t| {
+ Arc::new(ForeignTableProvider::from(t)) as Arc<dyn
TableProvider>
+ });
+
+ Ok(table)
+ }
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> Result<Option<Arc<dyn TableProvider>>> {
+ unsafe {
+ let ffi_table = match
table.as_any().downcast_ref::<ForeignTableProvider>() {
+ Some(t) => t.0.clone(),
+ None => FFI_TableProvider::new(table, true, None),
+ };
+
+ let returned_provider: Option<FFI_TableProvider> =
+ df_result!((self.0.register_table)(&self.0, name.into(),
ffi_table))?
+ .into();
+
+ Ok(returned_provider
+ .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn
TableProvider>))
+ }
+ }
+
+ fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
+ let returned_provider: Option<FFI_TableProvider> = unsafe {
+ df_result!((self.0.deregister_table)(&self.0, name.into()))?.into()
+ };
+
+ Ok(returned_provider
+ .map(|t| Arc::new(ForeignTableProvider(t)) as Arc<dyn
TableProvider>))
+ }
+
+ /// Returns true if table exist in the schema provider, false otherwise.
+ fn table_exist(&self, name: &str) -> bool {
+ unsafe { (self.0.table_exist)(&self.0, name.into()) }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use arrow::datatypes::Schema;
+ use datafusion::{catalog::MemorySchemaProvider,
datasource::empty::EmptyTable};
+
+ use super::*;
+
+ fn empty_table() -> Arc<dyn TableProvider> {
+ Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
+ }
+
+ #[tokio::test]
+ async fn test_round_trip_ffi_schema_provider() {
+ let schema_provider = Arc::new(MemorySchemaProvider::new());
+ assert!(schema_provider
+ .as_ref()
+ .register_table("prior_table".to_string(), empty_table())
+ .unwrap()
+ .is_none());
+
+ let ffi_schema_provider = FFI_SchemaProvider::new(schema_provider,
None);
+
+ let foreign_schema_provider: ForeignSchemaProvider =
+ (&ffi_schema_provider).into();
+
+ let prior_table_names = foreign_schema_provider.table_names();
+ assert_eq!(prior_table_names.len(), 1);
+ assert_eq!(prior_table_names[0], "prior_table");
+
+ // Replace an existing table with one of the same name generates an
error
+ let returned_schema = foreign_schema_provider
+ .register_table("prior_table".to_string(), empty_table());
+ assert!(returned_schema.is_err());
+ assert_eq!(foreign_schema_provider.table_names().len(), 1);
+
+ // Add a new table
+ let returned_schema = foreign_schema_provider
+ .register_table("second_table".to_string(), empty_table())
+ .expect("Unable to register table");
+ assert!(returned_schema.is_none());
+ assert_eq!(foreign_schema_provider.table_names().len(), 2);
+
+ // Remove a table
+ let returned_schema = foreign_schema_provider
+ .deregister_table("prior_table")
+ .expect("Unable to deregister table");
+ assert!(returned_schema.is_some());
+ assert_eq!(foreign_schema_provider.table_names().len(), 1);
+
+ // Retrieve non-existant table
+ let returned_schema = foreign_schema_provider
+ .table("prior_table")
+ .await
+ .expect("Unable to query table");
+ assert!(returned_schema.is_none());
+ assert!(!foreign_schema_provider.table_exist("prior_table"));
+
+ // Retrieve valid table
+ let returned_schema = foreign_schema_provider
+ .table("second_table")
+ .await
+ .expect("Unable to query table");
+ assert!(returned_schema.is_some());
+ assert!(foreign_schema_provider.table_exist("second_table"));
+ }
+}
diff --git a/datafusion/ffi/src/table_provider.rs
b/datafusion/ffi/src/table_provider.rs
index 0b4080abcb..a7391a8503 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -382,7 +382,7 @@ impl FFI_TableProvider {
/// defined on this struct must only use the stable functions provided in
/// FFI_TableProvider to interact with the foreign table provider.
#[derive(Debug)]
-pub struct ForeignTableProvider(FFI_TableProvider);
+pub struct ForeignTableProvider(pub FFI_TableProvider);
unsafe impl Send for ForeignTableProvider {}
unsafe impl Sync for ForeignTableProvider {}
diff --git a/datafusion/ffi/src/tests/catalog.rs
b/datafusion/ffi/src/tests/catalog.rs
new file mode 100644
index 0000000000..f4293adb41
--- /dev/null
+++ b/datafusion/ffi/src/tests/catalog.rs
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This is an example of an async table provider that will call functions on
+//! the tokio runtime of the library providing the function. Since we cannot
+//! share a tokio runtime across the ffi boundary and the producer and consumer
+//! may have different runtimes, we need to store a reference to the runtime
+//! and enter it during streaming calls. The entering of the runtime will
+//! occur by the datafusion_ffi crate during the streaming calls. This code
+//! serves as an integration test of this feature. If we do not correctly
+//! access the runtime, then you will get a panic when trying to do operations
+//! such as spawning a tokio task.
+
+use std::{any::Any, fmt::Debug, sync::Arc};
+
+use crate::catalog_provider::FFI_CatalogProvider;
+use arrow::datatypes::Schema;
+use async_trait::async_trait;
+use datafusion::{
+ catalog::{
+ CatalogProvider, MemoryCatalogProvider, MemorySchemaProvider,
SchemaProvider,
+ TableProvider,
+ },
+ common::exec_err,
+ datasource::MemTable,
+ error::{DataFusionError, Result},
+};
+
+/// This schema provider is intended only for unit tests. It prepopulates with
one
+/// table and only allows for tables named sales and purchases.
+#[derive(Debug)]
+pub struct FixedSchemaProvider {
+ inner: MemorySchemaProvider,
+}
+
+pub fn fruit_table() -> Arc<dyn TableProvider + 'static> {
+ use arrow::datatypes::{DataType, Field};
+ use datafusion::common::record_batch;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("units", DataType::Int32, true),
+ Field::new("price", DataType::Float64, true),
+ ]));
+
+ let partitions = vec![
+ record_batch!(
+ ("units", Int32, vec![10, 20, 30]),
+ ("price", Float64, vec![1.0, 2.0, 5.0])
+ )
+ .unwrap(),
+ record_batch!(
+ ("units", Int32, vec![5, 7]),
+ ("price", Float64, vec![1.5, 2.5])
+ )
+ .unwrap(),
+ ];
+
+ Arc::new(MemTable::try_new(schema, vec![partitions]).unwrap())
+}
+
+impl Default for FixedSchemaProvider {
+ fn default() -> Self {
+ let inner = MemorySchemaProvider::new();
+
+ let table = fruit_table();
+
+ let _ = inner
+ .register_table("purchases".to_string(), table)
+ .unwrap();
+
+ Self { inner }
+ }
+}
+
+#[async_trait]
+impl SchemaProvider for FixedSchemaProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn table_names(&self) -> Vec<String> {
+ self.inner.table_names()
+ }
+
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+ self.inner.table(name).await
+ }
+
+ fn table_exist(&self, name: &str) -> bool {
+ self.inner.table_exist(name)
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> Result<Option<Arc<dyn TableProvider>>> {
+ if name.as_str() != "sales" && name.as_str() != "purchases" {
+ return exec_err!(
+ "FixedSchemaProvider only provides two tables: sales and
purchases"
+ );
+ }
+
+ self.inner.register_table(name, table)
+ }
+
+ fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
+ self.inner.deregister_table(name)
+ }
+}
+
+/// This catalog provider is intended only for unit tests. It prepopulates
with one
+/// schema and only allows for schemas named after four types of fruit.
+#[derive(Debug)]
+pub struct FixedCatalogProvider {
+ inner: MemoryCatalogProvider,
+}
+
+impl Default for FixedCatalogProvider {
+ fn default() -> Self {
+ let inner = MemoryCatalogProvider::new();
+
+ let _ = inner.register_schema("apple",
Arc::new(FixedSchemaProvider::default()));
+
+ Self { inner }
+ }
+}
+
+impl CatalogProvider for FixedCatalogProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema_names(&self) -> Vec<String> {
+ self.inner.schema_names()
+ }
+
+ fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+ self.inner.schema(name)
+ }
+
+ fn register_schema(
+ &self,
+ name: &str,
+ schema: Arc<dyn SchemaProvider>,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ if !["apple", "banana", "cherry", "date"].contains(&name) {
+ return exec_err!("FixedCatalogProvider only provides four schemas:
apple, banana, cherry, date");
+ }
+
+ self.inner.register_schema(name, schema)
+ }
+
+ fn deregister_schema(
+ &self,
+ name: &str,
+ cascade: bool,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ self.inner.deregister_schema(name, cascade)
+ }
+}
+
+pub(crate) extern "C" fn create_catalog_provider() -> FFI_CatalogProvider {
+ let catalog_provider = Arc::new(FixedCatalogProvider::default());
+ FFI_CatalogProvider::new(catalog_provider, None)
+}
diff --git a/datafusion/ffi/src/tests/mod.rs b/datafusion/ffi/src/tests/mod.rs
index 5a471cb8fe..4b4a29276d 100644
--- a/datafusion/ffi/src/tests/mod.rs
+++ b/datafusion/ffi/src/tests/mod.rs
@@ -25,6 +25,9 @@ use abi_stable::{
sabi_types::VersionStrings,
StableAbi,
};
+use catalog::create_catalog_provider;
+
+use crate::catalog_provider::FFI_CatalogProvider;
use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
use arrow::array::RecordBatch;
@@ -37,6 +40,7 @@ use sync_provider::create_sync_table_provider;
use udf_udaf_udwf::create_ffi_abs_func;
mod async_provider;
+pub mod catalog;
mod sync_provider;
mod udf_udaf_udwf;
@@ -47,6 +51,9 @@ mod udf_udaf_udwf;
/// both the module loading program and library that implements the
/// module.
pub struct ForeignLibraryModule {
+ /// Construct an opinionated catalog provider
+ pub create_catalog: extern "C" fn() -> FFI_CatalogProvider,
+
/// Constructs the table provider
pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,
@@ -95,6 +102,7 @@ extern "C" fn construct_table_provider(synchronous: bool) ->
FFI_TableProvider {
/// This defines the entry point for using the module.
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
ForeignLibraryModule {
+ create_catalog: create_catalog_provider,
create_table: construct_table_provider,
create_scalar_udf: create_ffi_abs_func,
version: super::version,
diff --git a/datafusion/ffi/tests/ffi_integration.rs
b/datafusion/ffi/tests/ffi_integration.rs
index 84e120df42..f610f12c82 100644
--- a/datafusion/ffi/tests/ffi_integration.rs
+++ b/datafusion/ffi/tests/ffi_integration.rs
@@ -25,6 +25,7 @@ mod tests {
use datafusion::error::{DataFusionError, Result};
use datafusion::logical_expr::ScalarUDF;
use datafusion::prelude::{col, SessionContext};
+ use datafusion_ffi::catalog_provider::ForeignCatalogProvider;
use datafusion_ffi::table_provider::ForeignTableProvider;
use datafusion_ffi::tests::{create_record_batch, ForeignLibraryModuleRef};
use datafusion_ffi::udf::ForeignScalarUDF;
@@ -179,4 +180,30 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_catalog() -> Result<()> {
+ let module = get_module()?;
+
+ let ffi_catalog =
+ module
+ .create_catalog()
+ .ok_or(DataFusionError::NotImplemented(
+ "External catalog provider failed to implement
create_catalog"
+ .to_string(),
+ ))?();
+ let foreign_catalog: ForeignCatalogProvider = (&ffi_catalog).into();
+
+ let ctx = SessionContext::default();
+ let _ = ctx.register_catalog("fruit", Arc::new(foreign_catalog));
+
+ let df = ctx.table("fruit.apple.purchases").await?;
+
+ let results = df.collect().await?;
+
+ assert!(!results.is_empty());
+ assert!(results[0].num_rows() != 0);
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]