This is an automated email from the ASF dual-hosted git repository.

mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f3d1ef23f refactor: `SchemaProvider::table` can fail (#9307)
8f3d1ef23f is described below

commit 8f3d1ef23f93cd4303745eba76c0850b39774d07
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Feb 27 11:23:53 2024 +0100

    refactor: `SchemaProvider::table` can fail (#9307)
---
 datafusion-cli/src/catalog.rs                      | 43 ++++++++++++---------
 .../examples/external_dependency/catalog.rs        |  4 +-
 datafusion/core/src/catalog/information_schema.rs  | 45 +++++++++++++++-------
 datafusion/core/src/catalog/listing_schema.rs      | 10 +++--
 datafusion/core/src/catalog/schema.rs              | 12 ++++--
 datafusion/core/src/execution/context/mod.rs       |  6 +--
 datafusion/core/src/physical_planner.rs            |  4 +-
 datafusion/core/tests/sql/create_drop.rs           |  2 +-
 8 files changed, 80 insertions(+), 46 deletions(-)

diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
index f664d40df5..67184b8257 100644
--- a/datafusion-cli/src/catalog.rs
+++ b/datafusion-cli/src/catalog.rs
@@ -19,6 +19,7 @@ use crate::object_storage::get_object_store;
 use async_trait::async_trait;
 use datafusion::catalog::schema::SchemaProvider;
 use datafusion::catalog::{CatalogProvider, CatalogProviderList};
+use datafusion::common::{plan_datafusion_err, DataFusionError};
 use datafusion::datasource::listing::{
     ListingTable, ListingTableConfig, ListingTableUrl,
 };
@@ -145,16 +146,21 @@ impl SchemaProvider for DynamicFileSchemaProvider {
         self.inner.register_table(name, table)
     }
 
-    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
-        let inner_table = self.inner.table(name).await;
+    async fn table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        let inner_table = self.inner.table(name).await?;
         if inner_table.is_some() {
-            return inner_table;
+            return Ok(inner_table);
         }
 
         // if the inner schema provider didn't have a table by
         // that name, try to treat it as a listing table
-        let state = self.state.upgrade()?.read().clone();
-        let table_url = ListingTableUrl::parse(name).ok()?;
+        let state = self
+            .state
+            .upgrade()
+            .ok_or_else(|| plan_datafusion_err!("locking error"))?
+            .read()
+            .clone();
+        let table_url = ListingTableUrl::parse(name)?;
         let url: &Url = table_url.as_ref();
 
         // If the store is already registered for this URL then `get_store`
@@ -169,18 +175,20 @@ impl SchemaProvider for DynamicFileSchemaProvider {
                 let mut options = HashMap::new();
                 let store =
                     get_object_store(&state, &mut options, table_url.scheme(), 
url)
-                        .await
-                        .unwrap();
+                        .await?;
                 state.runtime_env().register_object_store(url, store);
             }
         }
 
-        let config = ListingTableConfig::new(table_url)
-            .infer(&state)
-            .await
-            .ok()?;
+        let config = match 
ListingTableConfig::new(table_url).infer(&state).await {
+            Ok(cfg) => cfg,
+            Err(_) => {
+                // treat as non-existing
+                return Ok(None);
+            }
+        };
 
-        Some(Arc::new(ListingTable::try_new(config).ok()?))
+        Ok(Some(Arc::new(ListingTable::try_new(config)?)))
     }
 
     fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
@@ -227,7 +235,7 @@ mod tests {
         let (ctx, schema) = setup_context();
 
         // That's a non registered table so expecting None here
-        let table = schema.table(&location).await;
+        let table = schema.table(&location).await.unwrap();
         assert!(table.is_none());
 
         // It should still create an object store for the location in the 
SessionState
@@ -251,7 +259,7 @@ mod tests {
 
         let (ctx, schema) = setup_context();
 
-        let table = schema.table(&location).await;
+        let table = schema.table(&location).await.unwrap();
         assert!(table.is_none());
 
         let store = ctx
@@ -273,7 +281,7 @@ mod tests {
 
         let (ctx, schema) = setup_context();
 
-        let table = schema.table(&location).await;
+        let table = schema.table(&location).await.unwrap();
         assert!(table.is_none());
 
         let store = ctx
@@ -289,13 +297,10 @@ mod tests {
     }
 
     #[tokio::test]
-    #[should_panic]
     async fn query_invalid_location_test() {
         let location = "ts://file.parquet";
         let (_ctx, schema) = setup_context();
 
-        // This will panic, we cannot prevent that because `schema.table`
-        // returns an Option
-        schema.table(location).await;
+        assert!(schema.table(location).await.is_err());
     }
 }
diff --git a/datafusion-examples/examples/external_dependency/catalog.rs 
b/datafusion-examples/examples/external_dependency/catalog.rs
index a623eafdf3..28a720cc33 100644
--- a/datafusion-examples/examples/external_dependency/catalog.rs
+++ b/datafusion-examples/examples/external_dependency/catalog.rs
@@ -180,9 +180,9 @@ impl SchemaProvider for DirSchema {
         tables.keys().cloned().collect::<Vec<_>>()
     }
 
-    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+    async fn table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
         let tables = self.tables.read().unwrap();
-        tables.get(name).cloned()
+        Ok(tables.get(name).cloned())
     }
 
     fn table_exist(&self, name: &str) -> bool {
diff --git a/datafusion/core/src/catalog/information_schema.rs 
b/datafusion/core/src/catalog/information_schema.rs
index 80ce3b1ae4..cd8f764953 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -20,6 +20,7 @@
 //! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
 
 use async_trait::async_trait;
+use datafusion_common::DataFusionError;
 use std::{any::Any, sync::Arc};
 
 use arrow::{
@@ -78,7 +79,10 @@ struct InformationSchemaConfig {
 
 impl InformationSchemaConfig {
     /// Construct the `information_schema.tables` virtual table
-    async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
+    async fn make_tables(
+        &self,
+        builder: &mut InformationSchemaTablesBuilder,
+    ) -> Result<(), DataFusionError> {
         // create a mem table with the names of tables
 
         for catalog_name in self.catalog_list.catalog_names() {
@@ -89,7 +93,7 @@ impl InformationSchemaConfig {
                     // schema name may not exist in the catalog, so we need to 
check
                     if let Some(schema) = catalog.schema(&schema_name) {
                         for table_name in schema.table_names() {
-                            if let Some(table) = 
schema.table(&table_name).await {
+                            if let Some(table) = 
schema.table(&table_name).await? {
                                 builder.add_table(
                                     &catalog_name,
                                     &schema_name,
@@ -124,6 +128,8 @@ impl InformationSchemaConfig {
                 TableType::View,
             );
         }
+
+        Ok(())
     }
 
     async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
@@ -141,7 +147,10 @@ impl InformationSchemaConfig {
         }
     }
 
-    async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
+    async fn make_views(
+        &self,
+        builder: &mut InformationSchemaViewBuilder,
+    ) -> Result<(), DataFusionError> {
         for catalog_name in self.catalog_list.catalog_names() {
             let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
 
@@ -150,7 +159,7 @@ impl InformationSchemaConfig {
                     // schema name may not exist in the catalog, so we need to 
check
                     if let Some(schema) = catalog.schema(&schema_name) {
                         for table_name in schema.table_names() {
-                            if let Some(table) = 
schema.table(&table_name).await {
+                            if let Some(table) = 
schema.table(&table_name).await? {
                                 builder.add_view(
                                     &catalog_name,
                                     &schema_name,
@@ -163,10 +172,15 @@ impl InformationSchemaConfig {
                 }
             }
         }
+
+        Ok(())
     }
 
     /// Construct the `information_schema.columns` virtual table
-    async fn make_columns(&self, builder: &mut 
InformationSchemaColumnsBuilder) {
+    async fn make_columns(
+        &self,
+        builder: &mut InformationSchemaColumnsBuilder,
+    ) -> Result<(), DataFusionError> {
         for catalog_name in self.catalog_list.catalog_names() {
             let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
 
@@ -175,7 +189,7 @@ impl InformationSchemaConfig {
                     // schema name may not exist in the catalog, so we need to 
check
                     if let Some(schema) = catalog.schema(&schema_name) {
                         for table_name in schema.table_names() {
-                            if let Some(table) = 
schema.table(&table_name).await {
+                            if let Some(table) = 
schema.table(&table_name).await? {
                                 for (field_position, field) in
                                     table.schema().fields().iter().enumerate()
                                 {
@@ -193,6 +207,8 @@ impl InformationSchemaConfig {
                 }
             }
         }
+
+        Ok(())
     }
 
     /// Construct the `information_schema.df_settings` virtual table
@@ -223,7 +239,10 @@ impl SchemaProvider for InformationSchemaProvider {
         ]
     }
 
-    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+    async fn table(
+        &self,
+        name: &str,
+    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
         let config = self.config.clone();
         let table: Arc<dyn PartitionStream> = if 
name.eq_ignore_ascii_case("tables") {
             Arc::new(InformationSchemaTables::new(config))
@@ -236,12 +255,12 @@ impl SchemaProvider for InformationSchemaProvider {
         } else if name.eq_ignore_ascii_case("schemata") {
             Arc::new(InformationSchemata::new(config))
         } else {
-            return None;
+            return Ok(None);
         };
 
-        Some(Arc::new(
+        Ok(Some(Arc::new(
             StreamingTable::try_new(table.schema().clone(), 
vec![table]).unwrap(),
-        ))
+        )))
     }
 
     fn table_exist(&self, name: &str) -> bool {
@@ -292,7 +311,7 @@ impl PartitionStream for InformationSchemaTables {
             self.schema.clone(),
             // TODO: Stream this
             futures::stream::once(async move {
-                config.make_tables(&mut builder).await;
+                config.make_tables(&mut builder).await?;
                 Ok(builder.finish())
             }),
         ))
@@ -383,7 +402,7 @@ impl PartitionStream for InformationSchemaViews {
             self.schema.clone(),
             // TODO: Stream this
             futures::stream::once(async move {
-                config.make_views(&mut builder).await;
+                config.make_views(&mut builder).await?;
                 Ok(builder.finish())
             }),
         ))
@@ -497,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns {
             self.schema.clone(),
             // TODO: Stream this
             futures::stream::once(async move {
-                config.make_columns(&mut builder).await;
+                config.make_columns(&mut builder).await?;
                 Ok(builder.finish())
             }),
         ))
diff --git a/datafusion/core/src/catalog/listing_schema.rs 
b/datafusion/core/src/catalog/listing_schema.rs
index c3c6826895..f64b43062d 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -175,12 +175,16 @@ impl SchemaProvider for ListingSchemaProvider {
             .collect()
     }
 
-    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
-        self.tables
+    async fn table(
+        &self,
+        name: &str,
+    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+        Ok(self
+            .tables
             .lock()
             .expect("Can't lock tables")
             .get(name)
-            .cloned()
+            .cloned())
     }
 
     fn register_table(
diff --git a/datafusion/core/src/catalog/schema.rs 
b/datafusion/core/src/catalog/schema.rs
index 1e9a86b496..49f8350ecc 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -49,7 +49,10 @@ pub trait SchemaProvider: Sync + Send {
 
     /// Retrieves a specific table from the schema by name, if it exists,
     /// otherwise returns `None`.
-    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
+    async fn table(
+        &self,
+        name: &str,
+    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>;
 
     /// If supported by the implementation, adds a new table named `name` to
     /// this schema.
@@ -111,8 +114,11 @@ impl SchemaProvider for MemorySchemaProvider {
             .collect()
     }
 
-    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
-        self.tables.get(name).map(|table| table.value().clone())
+    async fn table(
+        &self,
+        name: &str,
+    ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+        Ok(self.tables.get(name).map(|table| table.value().clone()))
     }
 
     fn register_table(
diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index b130070141..ffc4a4f717 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -783,7 +783,7 @@ impl SessionContext {
         };
 
         if let Some(schema) = maybe_schema {
-            if let Some(table_provider) = schema.table(&table).await {
+            if let Some(table_provider) = schema.table(&table).await? {
                 if table_provider.table_type() == table_type {
                     schema.deregister_table(&table)?;
                     return Ok(true);
@@ -1115,7 +1115,7 @@ impl SessionContext {
         let table_ref = table_ref.into();
         let table = table_ref.table().to_string();
         let schema = self.state.read().schema_for_ref(table_ref)?;
-        match schema.table(&table).await {
+        match schema.table(&table).await? {
             Some(ref provider) => Ok(Arc::clone(provider)),
             _ => plan_err!("No table named '{table}'"),
         }
@@ -1714,7 +1714,7 @@ impl SessionState {
             let resolved = self.resolve_table_ref(&reference);
             if let Entry::Vacant(v) = 
provider.tables.entry(resolved.to_string()) {
                 if let Ok(schema) = self.schema_for_ref(resolved) {
-                    if let Some(table) = schema.table(table).await {
+                    if let Some(table) = schema.table(table).await? {
                         v.insert(provider_as_source(table));
                     }
                 }
diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 23ac7e08ca..83ba773464 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -624,7 +624,7 @@ impl DefaultPhysicalPlanner {
                 }) => {
                     let name = table_name.table();
                     let schema = session_state.schema_for_ref(table_name)?;
-                    if let Some(provider) = schema.table(name).await {
+                    if let Some(provider) = schema.table(name).await? {
                         let input_exec = self.create_initial_plan(input, 
session_state).await?;
                         provider.insert_into(session_state, input_exec, 
false).await
                     } else {
@@ -641,7 +641,7 @@ impl DefaultPhysicalPlanner {
                 }) => {
                     let name = table_name.table();
                     let schema = session_state.schema_for_ref(table_name)?;
-                    if let Some(provider) = schema.table(name).await {
+                    if let Some(provider) = schema.table(name).await? {
                         let input_exec = self.create_initial_plan(input, 
session_state).await?;
                         provider.insert_into(session_state, input_exec, 
true).await
                     } else {
diff --git a/datafusion/core/tests/sql/create_drop.rs 
b/datafusion/core/tests/sql/create_drop.rs
index b1434dddee..2174009b85 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -63,7 +63,7 @@ async fn create_external_table_with_ddl() -> Result<()> {
     let exists = schema.table_exist("dt");
     assert!(exists, "Table should have been created!");
 
-    let table_schema = schema.table("dt").await.unwrap().schema();
+    let table_schema = schema.table("dt").await.unwrap().unwrap().schema();
 
     assert_eq!(3, table_schema.fields().len());
 

Reply via email to