This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 71f157f8b1 Add example of interacting with a remote catalog (#13722)
71f157f8b1 is described below
commit 71f157f8b10fcf8732c5e74fbdf6b6a488b9aff6
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Dec 19 06:43:00 2024 -0500
Add example of interacting with a remote catalog (#13722)
* Add example of interacting with a remote catalog
* Update datafusion/core/src/execution/session_state.rs
Co-authored-by: Berkay Şahin
<[email protected]>
* Apply suggestions from code review
Co-authored-by: Jonah Gao <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
* Use HashMap to hold tables
---------
Co-authored-by: Berkay Şahin
<[email protected]>
Co-authored-by: Jonah Gao <[email protected]>
Co-authored-by: Weston Pace <[email protected]>
---
datafusion-examples/README.md | 1 +
datafusion-examples/examples/remote_catalog.rs | 369 +++++++++++++++++++++++++
datafusion/catalog/src/catalog.rs | 14 +-
datafusion/core/src/execution/session_state.rs | 2 +-
4 files changed, 380 insertions(+), 6 deletions(-)
diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index 528e7dd857..aca600e50e 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -77,6 +77,7 @@ cargo run --example dataframe
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure
`object_store` and run a query against files stored in AWS S3
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store`
and run a query against files vi HTTP
- [`regexp.rs`](examples/regexp.rs): Examples of using regular expression
functions
+- [`remote_catalog.rs`](examples/regexp.rs): Examples of interfacing with a
remote catalog (e.g. over a network)
- [`simple_udaf.rs`](examples/simple_udaf.rs): Define and invoke a User
Defined Aggregate Function (UDAF)
- [`simple_udf.rs`](examples/simple_udf.rs): Define and invoke a User Defined
Scalar Function (UDF)
- [`simple_udfw.rs`](examples/simple_udwf.rs): Define and invoke a User
Defined Window Function (UDWF)
diff --git a/datafusion-examples/examples/remote_catalog.rs
b/datafusion-examples/examples/remote_catalog.rs
new file mode 100644
index 0000000000..206b7ba9c4
--- /dev/null
+++ b/datafusion-examples/examples/remote_catalog.rs
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/// This example shows how to implement the DataFusion [`CatalogProvider`] API
+/// for catalogs that are remote (require network access) and/or offer only
+/// asynchronous APIs such as [Polaris], [Unity], and [Hive].
+///
+/// Integrating with this catalogs is a bit more complex than with local
+/// catalogs because calls like `ctx.sql("SELECT * FROM db.schm.tbl")` may need
+/// to perform remote network requests, but many Catalog APIs are synchronous.
+/// See the documentation on [`CatalogProvider`] for more details.
+///
+/// [`CatalogProvider`]: datafusion_catalog::CatalogProvider
+///
+/// [Polaris]: https://github.com/apache/polaris
+/// [Unity]: https://github.com/unitycatalog/unitycatalog
+/// [Hive]: https://hive.apache.org/
+use arrow::array::record_batch;
+use arrow_schema::{Field, Fields, Schema, SchemaRef};
+use async_trait::async_trait;
+use datafusion::catalog::{SchemaProvider, TableProvider};
+use datafusion::common::DataFusionError;
+use datafusion::common::Result;
+use datafusion::execution::SendableRecordBatchStream;
+use datafusion::physical_plan::memory::MemoryExec;
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::{DataFrame, SessionContext};
+use datafusion_catalog::Session;
+use datafusion_common::{
+ assert_batches_eq, internal_datafusion_err, plan_err, HashMap,
TableReference,
+};
+use datafusion_expr::{Expr, TableType};
+use futures::TryStreamExt;
+use std::any::Any;
+use std::sync::{Arc, Mutex};
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ // As always, we create a session context to interact with DataFusion
+ let ctx = SessionContext::new();
+
+ // Make a connection to the remote catalog, asynchronously, and configure
it
+ let remote_catalog_interface = RemoteCatalogInterface::connect().await?;
+
+ // Register a SchemaProvider for tables in a schema named "remote_schema".
+ //
+ // This will let DataFusion query tables such as
+ // `datafusion.remote_schema.remote_table`
+ let remote_schema: Arc<dyn SchemaProvider> =
+ Arc::new(RemoteSchema::new(Arc::new(remote_catalog_interface)));
+ ctx.catalog("datafusion")
+ .ok_or_else(|| internal_datafusion_err!("default catalog was not
installed"))?
+ .register_schema("remote_schema", Arc::clone(&remote_schema))?;
+
+ // Here is a query that selects data from a table in the remote catalog.
+ let sql = "SELECT * from remote_schema.remote_table";
+
+ // The `SessionContext::sql` interface is async, but it does not
+ // support asynchronous access to catalogs, so the following query errors.
+ let results = ctx.sql(sql).await;
+ assert_eq!(
+ results.unwrap_err().to_string(),
+ "Error during planning: table 'datafusion.remote_schema.remote_table'
not found"
+ );
+
+ // Instead, to use a remote catalog, we must use lower level APIs on
+ // SessionState (what `SessionContext::sql` does internally).
+ let state = ctx.state();
+
+ // First, parse the SQL (but don't plan it / resolve any table references)
+ let dialect = state.config().options().sql_parser.dialect.as_str();
+ let statement = state.sql_to_statement(sql, dialect)?;
+
+ // Find all `TableReferences` in the parsed queries. These correspond to
the
+ // tables referred to by the query (in this case
+ // `remote_schema.remote_table`)
+ let references = state.resolve_table_references(&statement)?;
+
+ // Call `load_tables` to load information from the remote catalog for each
+ // of the referenced tables. Best practice is to fetch the the information
+ // for all tables required by the query once (rather than one per table) to
+ // minimize network overhead
+ let table_names = references.iter().filter_map(|r| {
+ if refers_to_schema("datafusion", "remote_schema", r) {
+ Some(r.table())
+ } else {
+ None
+ }
+ });
+ remote_schema
+ .as_any()
+ .downcast_ref::<RemoteSchema>()
+ .expect("correct types")
+ .load_tables(table_names)
+ .await?;
+
+ // Now continue planing the query after having fetched the remote table and
+ // it can run as normal
+ let plan = state.statement_to_plan(statement).await?;
+ let results = DataFrame::new(state, plan).collect().await?;
+ assert_batches_eq!(
+ [
+ "+----+-------+",
+ "| id | name |",
+ "+----+-------+",
+ "| 1 | alpha |",
+ "| 2 | beta |",
+ "| 3 | gamma |",
+ "+----+-------+",
+ ],
+ &results
+ );
+
+ Ok(())
+}
+
+/// This is an example of an API that interacts with a remote catalog.
+///
+/// Specifically, its APIs are all `async` and thus can not be used by
+/// [`SchemaProvider`] or [`TableProvider`] directly.
+#[derive(Debug)]
+struct RemoteCatalogInterface {}
+
+impl RemoteCatalogInterface {
+ /// Establish a connection to the remote catalog
+ pub async fn connect() -> Result<Self> {
+ // In a real implementation this method might connect to a remote
+ // catalog, validate credentials, cache basic information, etc
+ Ok(Self {})
+ }
+
+ /// Fetches information for a specific table
+ pub async fn table_info(&self, name: &str) -> Result<SchemaRef> {
+ if name != "remote_table" {
+ return plan_err!("Remote table not found: {}", name);
+ }
+
+ // In this example, we'll model a remote table with columns "id" and
+ // "name"
+ //
+ // A real remote catalog would make a network call to fetch this
+ // information from a remote source.
+ let schema = Schema::new(Fields::from(vec![
+ Field::new("id", arrow::datatypes::DataType::Int32, false),
+ Field::new("name", arrow::datatypes::DataType::Utf8, false),
+ ]));
+ Ok(Arc::new(schema))
+ }
+
+ /// Fetches data for a table from a remote data source
+ pub async fn read_data(&self, name: &str) ->
Result<SendableRecordBatchStream> {
+ if name != "remote_table" {
+ return plan_err!("Remote table not found: {}", name);
+ }
+
+ // In a real remote catalog this call would likely perform network IO
to
+ // open and begin reading from a remote datasource, prefetching
+ // information, etc.
+ //
+ // In this example we are just demonstrating how the API works so
simply
+ // return back some static data as a stream.
+ let batch = record_batch!(
+ ("id", Int32, [1, 2, 3]),
+ ("name", Utf8, ["alpha", "beta", "gamma"])
+ )
+ .unwrap();
+ let schema = batch.schema();
+
+ let stream = futures::stream::iter([Ok(batch)]);
+ Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+ }
+}
+
+/// Implements the DataFusion Catalog API interface for tables
+/// stored in a remote catalog.
+#[derive(Debug)]
+struct RemoteSchema {
+ /// Connection with the remote catalog
+ remote_catalog_interface: Arc<RemoteCatalogInterface>,
+ /// Local cache of tables that have been preloaded from the remote
+ /// catalog
+ tables: Mutex<HashMap<String, Arc<dyn TableProvider>>>,
+}
+
+impl RemoteSchema {
+ /// Create a new RemoteSchema
+ pub fn new(remote_catalog_interface: Arc<RemoteCatalogInterface>) -> Self {
+ Self {
+ remote_catalog_interface,
+ tables: Mutex::new(HashMap::new()),
+ }
+ }
+
+ /// Load information for the specified tables from the remote source into
+ /// the local cached copy.
+ pub async fn load_tables(
+ &self,
+ references: impl IntoIterator<Item = &str>,
+ ) -> Result<()> {
+ for table_name in references {
+ if !self.table_exist(table_name) {
+ // Fetch information about the table from the remote catalog
+ //
+ // Note that a real remote catalog interface could return more
+ // information, but at the minimum, DataFusion requires the
+ // table's schema for planing.
+ let schema =
self.remote_catalog_interface.table_info(table_name).await?;
+ let remote_table = RemoteTable::new(
+ Arc::clone(&self.remote_catalog_interface),
+ table_name,
+ schema,
+ );
+
+ // Add the table to our local cached list
+ self.tables
+ .lock()
+ .expect("mutex invalid")
+ .insert(table_name.to_string(), Arc::new(remote_table));
+ };
+ }
+ Ok(())
+ }
+}
+
+/// Implement the DataFusion Catalog API for [`RemoteSchema`]
+#[async_trait]
+impl SchemaProvider for RemoteSchema {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn table_names(&self) -> Vec<String> {
+ // Note this API is not async so we can't directly call the
RemoteCatalogInterface
+ // instead we use the cached list of loaded tables
+ self.tables
+ .lock()
+ .expect("mutex valid")
+ .keys()
+ .cloned()
+ .collect()
+ }
+
+ // While this API is actually `async` and thus could consult a remote
+ // catalog directly it is more efficient to use a local cached copy
instead,
+ // which is what we model in this example
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+ // Look for any pre-loaded tables
+ let table = self
+ .tables
+ .lock()
+ .expect("mutex valid")
+ .get(name)
+ .map(Arc::clone);
+ Ok(table)
+ }
+
+ fn table_exist(&self, name: &str) -> bool {
+ // Look for any pre-loaded tables, note this function is also `async`
+ self.tables.lock().expect("mutex valid").contains_key(name)
+ }
+}
+
+/// Represents the information about a table retrieved from the remote catalog
+#[derive(Debug)]
+struct RemoteTable {
+ /// connection to the remote catalog
+ remote_catalog_interface: Arc<RemoteCatalogInterface>,
+ name: String,
+ schema: SchemaRef,
+}
+
+impl RemoteTable {
+ pub fn new(
+ remote_catalog_interface: Arc<RemoteCatalogInterface>,
+ name: impl Into<String>,
+ schema: SchemaRef,
+ ) -> Self {
+ Self {
+ remote_catalog_interface,
+ name: name.into(),
+ schema,
+ }
+ }
+}
+
+/// Implement the DataFusion Catalog API for [`RemoteTable`]
+#[async_trait]
+impl TableProvider for RemoteTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &dyn Session,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ // Note that `scan` is called once the plan begin execution, and thus
is
+ // async. When interacting with remote data sources, this is the place
+ // to begin establishing the remote connections and interacting with
the
+ // remote storage system.
+ //
+ // As this example is just modeling the catalog API interface, we
buffer
+ // the results locally in memory for simplicity.
+ let batches = self
+ .remote_catalog_interface
+ .read_data(&self.name)
+ .await?
+ .try_collect()
+ .await?;
+ Ok(Arc::new(MemoryExec::try_new(
+ &[batches],
+ self.schema.clone(),
+ projection.cloned(),
+ )?))
+ }
+}
+
+/// Return true if this `table_reference` might be for a table in the specified
+/// catalog and schema.
+fn refers_to_schema(
+ catalog_name: &str,
+ schema_name: &str,
+ table_reference: &TableReference,
+) -> bool {
+ // Check the references are in the correct catalog and schema
+ // references like foo.bar.baz
+ if let Some(catalog) = table_reference.catalog() {
+ if catalog != catalog_name {
+ return false;
+ }
+ }
+ // references like bar.baz
+ if let Some(schema) = table_reference.schema() {
+ if schema != schema_name {
+ return false;
+ }
+ }
+
+ true
+}
diff --git a/datafusion/catalog/src/catalog.rs
b/datafusion/catalog/src/catalog.rs
index 85f2dede2f..71b9eccf9d 100644
--- a/datafusion/catalog/src/catalog.rs
+++ b/datafusion/catalog/src/catalog.rs
@@ -52,12 +52,16 @@ use datafusion_common::Result;
///
/// # Implementing "Remote" catalogs
///
+/// See [`remote_catalog`] for an end to end example of how to implement a
+/// remote catalog.
+///
/// Sometimes catalog information is stored remotely and requires a network
call
/// to retrieve. For example, the [Delta Lake] table format stores table
/// metadata in files on S3 that must be first downloaded to discover what
/// schemas and tables exist.
///
/// [Delta Lake]: https://delta.io/
+/// [`remote_catalog`]:
https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/remote_catalog.rs
///
/// The [`CatalogProvider`] can support this use case, but it takes some care.
/// The planning APIs in DataFusion are not `async` and thus network IO can not
@@ -72,15 +76,15 @@ use datafusion_common::Result;
/// batch access to the remote catalog to retrieve multiple schemas and tables
/// in a single network call.
///
-/// Note that [`SchemaProvider::table`] is an `async` function in order to
+/// Note that [`SchemaProvider::table`] **is** an `async` function in order to
/// simplify implementing simple [`SchemaProvider`]s. For many table formats it
/// is easy to list all available tables but there is additional non trivial
/// access required to read table details (e.g. statistics).
///
/// The pattern that DataFusion itself uses to plan SQL queries is to walk over
-/// the query to find all table references,
-/// performing required remote catalog in parallel, and then plans the query
-/// using that snapshot.
+/// the query to find all table references, performing required remote catalog
+/// lookups in parallel, storing the results in a cached snapshot, and then
plans
+/// the query using that snapshot.
///
/// # Example Catalog Implementations
///
@@ -150,7 +154,7 @@ pub trait CatalogProvider: Debug + Sync + Send {
/// Represent a list of named [`CatalogProvider`]s.
///
-/// Please see the documentation on `CatalogProvider` for details of
+/// Please see the documentation on [`CatalogProvider`] for details of
/// implementing a custom catalog.
pub trait CatalogProviderList: Debug + Sync + Send {
/// Returns the catalog list as [`Any`]
diff --git a/datafusion/core/src/execution/session_state.rs
b/datafusion/core/src/execution/session_state.rs
index cef5d4c1ee..ef32e84a73 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -296,7 +296,7 @@ impl SessionState {
}
/// Retrieve the [`SchemaProvider`] for a specific [`TableReference`], if
it
- /// esists.
+ /// exists.
pub fn schema_for_ref(
&self,
table_ref: impl Into<TableReference>,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]