westonpace commented on code in PR #13800: URL: https://github.com/apache/datafusion/pull/13800#discussion_r1890289732
########## datafusion/catalog/src/async.rs: ########## @@ -0,0 +1,764 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion_common::{ + error::{DataFusionError, Result}, + HashMap, TableReference, +}; +use datafusion_execution::config::SessionConfig; + +use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider}; + +/// A schema provider that looks up tables in a cache +/// +/// This is created by the [`AsyncSchemaProvider::resolve`] method +#[derive(Debug)] +struct ResolvedSchemaProvider { + owner_name: Option<String>, + cached_tables: HashMap<String, Arc<dyn TableProvider>>, +} +#[async_trait] +impl SchemaProvider for ResolvedSchemaProvider { + fn owner_name(&self) -> Option<&str> { + self.owner_name.as_deref() + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn table_names(&self) -> Vec<String> { + self.cached_tables.keys().cloned().collect() + } + + async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { + Ok(self.cached_tables.get(name).cloned()) + } + + #[allow(unused_variables)] + fn register_table( + &self, + name: String, + _table: Arc<dyn TableProvider>, + ) -> Result<Option<Arc<dyn TableProvider>>> { + Err(DataFusionError::Execution(format!( + "Attempt to register table '{name}' with ResolvedSchemaProvider which is not supported" + ))) + } + + #[allow(unused_variables)] + fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> { + Err(DataFusionError::Execution(format!("Attempt to deregister table '{name}' with ResolvedSchemaProvider which is not supported"))) + } + + fn table_exist(&self, name: &str) -> bool { + self.cached_tables.contains_key(name) + } +} + +/// Helper class for building a [`ResolvedSchemaProvider`] +struct ResolvedSchemaProviderBuilder { + owner_name: String, + async_provider: Arc<dyn AsyncSchemaProvider>, + cached_tables: HashMap<String, Option<Arc<dyn TableProvider>>>, +} +impl ResolvedSchemaProviderBuilder { + fn new(owner_name: String, async_provider: Arc<dyn AsyncSchemaProvider>) -> Self { + Self { + owner_name, + async_provider, + cached_tables: HashMap::new(), + } + } + + fn finish(self) -> Arc<dyn SchemaProvider> { + let cached_tables = self + .cached_tables + .into_iter() + .filter_map(|(key, maybe_value)| maybe_value.map(|value| (key, value))) + .collect(); + Arc::new(ResolvedSchemaProvider { + owner_name: Some(self.owner_name), + cached_tables, + }) + } +} + +/// A catalog provider that looks up schemas in a cache +/// +/// This is created by the [`AsyncCatalogProvider::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProvider { + cached_schemas: HashMap<String, Arc<dyn SchemaProvider>>, +} +impl CatalogProvider for ResolvedCatalogProvider { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn schema_names(&self) -> Vec<String> { + self.cached_schemas.keys().cloned().collect() + } + + fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> { + self.cached_schemas.get(name).cloned() + } +} + +/// Helper class for building a [`ResolvedCatalogProvider`] +struct ResolvedCatalogProviderBuilder { + cached_schemas: HashMap<String, Option<ResolvedSchemaProviderBuilder>>, + async_provider: Arc<dyn AsyncCatalogProvider>, +} +impl ResolvedCatalogProviderBuilder { + fn new(async_provider: Arc<dyn AsyncCatalogProvider>) -> Self { + Self { + cached_schemas: HashMap::new(), + async_provider, + } + } + fn finish(self) -> Arc<dyn CatalogProvider> { + let cached_schemas = self + .cached_schemas + .into_iter() + .filter_map(|(key, maybe_value)| { + maybe_value.map(|value| (key, value.finish())) + }) + .collect(); + Arc::new(ResolvedCatalogProvider { cached_schemas }) + } +} + +/// A catalog provider list that looks up catalogs in a cache +/// +/// This is created by the [`AsyncCatalogProviderList::resolve`] method +#[derive(Debug)] +struct ResolvedCatalogProviderList { + cached_catalogs: HashMap<String, Arc<dyn CatalogProvider>>, +} +impl CatalogProviderList for ResolvedCatalogProviderList { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn register_catalog( + &self, + _name: String, + _catalog: Arc<dyn CatalogProvider>, + ) -> Option<Arc<dyn CatalogProvider>> { + unimplemented!("resolved providers cannot handle registration APIs") Review Comment: I'm not sure I understand? Are you suggesting we change the trait to return a `Result`? I would like to avoid making breaking changes to existing traits in this PR. If we really want to avoid these panics we should probably be splitting the catalog provider traits into a "read only provider" and a "writable catalog". I suspect there will be many instances where users have read-only catalogs and this would provide compile-time safety instead of runtime safety. Though I'd imagine that could be a future PR. I am not too concerned about the panic here because in the long term I don't think users should ever be exposed to the `ResolvedXyz` structs. My long term goal is for users to just write... ``` struct MyAsyncCatalog { ... } impl AsyncCatalogProvider for MyAsyncCatalog { ... } fn run_my_query(query: &str) { let ctx: SessionContext = ... let catalog: Arc<dyn AsyncCatalogProvider> = ... // Instantiate MyAsyncCatalog ctx.register_async_catalog(catalog); ctx.sql(...); // The method `resolve` is called in here and the user never has access to the // resolved catalog and so they cannot misuse it by calling register/deregister // methods on it } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org