This is an automated email from the ASF dual-hosted git repository.
mneumann pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 8f3d1ef23f refactor: `SchemaProvider::table` can fail (#9307)
8f3d1ef23f is described below
commit 8f3d1ef23f93cd4303745eba76c0850b39774d07
Author: Marco Neumann <[email protected]>
AuthorDate: Tue Feb 27 11:23:53 2024 +0100
refactor: `SchemaProvider::table` can fail (#9307)
---
datafusion-cli/src/catalog.rs | 43 ++++++++++++---------
.../examples/external_dependency/catalog.rs | 4 +-
datafusion/core/src/catalog/information_schema.rs | 45 +++++++++++++++-------
datafusion/core/src/catalog/listing_schema.rs | 10 +++--
datafusion/core/src/catalog/schema.rs | 12 ++++--
datafusion/core/src/execution/context/mod.rs | 6 +--
datafusion/core/src/physical_planner.rs | 4 +-
datafusion/core/tests/sql/create_drop.rs | 2 +-
8 files changed, 80 insertions(+), 46 deletions(-)
diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
index f664d40df5..67184b8257 100644
--- a/datafusion-cli/src/catalog.rs
+++ b/datafusion-cli/src/catalog.rs
@@ -19,6 +19,7 @@ use crate::object_storage::get_object_store;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
+use datafusion::common::{plan_datafusion_err, DataFusionError};
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
@@ -145,16 +146,21 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.register_table(name, table)
}
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
- let inner_table = self.inner.table(name).await;
+ async fn table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
+ let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
- return inner_table;
+ return Ok(inner_table);
}
// if the inner schema provider didn't have a table by
// that name, try to treat it as a listing table
- let state = self.state.upgrade()?.read().clone();
- let table_url = ListingTableUrl::parse(name).ok()?;
+ let state = self
+ .state
+ .upgrade()
+ .ok_or_else(|| plan_datafusion_err!("locking error"))?
+ .read()
+ .clone();
+ let table_url = ListingTableUrl::parse(name)?;
let url: &Url = table_url.as_ref();
// If the store is already registered for this URL then `get_store`
@@ -169,18 +175,20 @@ impl SchemaProvider for DynamicFileSchemaProvider {
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(),
url)
- .await
- .unwrap();
+ .await?;
state.runtime_env().register_object_store(url, store);
}
}
- let config = ListingTableConfig::new(table_url)
- .infer(&state)
- .await
- .ok()?;
+ let config = match
ListingTableConfig::new(table_url).infer(&state).await {
+ Ok(cfg) => cfg,
+ Err(_) => {
+ // treat as non-existing
+ return Ok(None);
+ }
+ };
- Some(Arc::new(ListingTable::try_new(config).ok()?))
+ Ok(Some(Arc::new(ListingTable::try_new(config)?)))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
@@ -227,7 +235,7 @@ mod tests {
let (ctx, schema) = setup_context();
// That's a non registered table so expecting None here
- let table = schema.table(&location).await;
+ let table = schema.table(&location).await.unwrap();
assert!(table.is_none());
// It should still create an object store for the location in the
SessionState
@@ -251,7 +259,7 @@ mod tests {
let (ctx, schema) = setup_context();
- let table = schema.table(&location).await;
+ let table = schema.table(&location).await.unwrap();
assert!(table.is_none());
let store = ctx
@@ -273,7 +281,7 @@ mod tests {
let (ctx, schema) = setup_context();
- let table = schema.table(&location).await;
+ let table = schema.table(&location).await.unwrap();
assert!(table.is_none());
let store = ctx
@@ -289,13 +297,10 @@ mod tests {
}
#[tokio::test]
- #[should_panic]
async fn query_invalid_location_test() {
let location = "ts://file.parquet";
let (_ctx, schema) = setup_context();
- // This will panic, we cannot prevent that because `schema.table`
- // returns an Option
- schema.table(location).await;
+ assert!(schema.table(location).await.is_err());
}
}
diff --git a/datafusion-examples/examples/external_dependency/catalog.rs
b/datafusion-examples/examples/external_dependency/catalog.rs
index a623eafdf3..28a720cc33 100644
--- a/datafusion-examples/examples/external_dependency/catalog.rs
+++ b/datafusion-examples/examples/external_dependency/catalog.rs
@@ -180,9 +180,9 @@ impl SchemaProvider for DirSchema {
tables.keys().cloned().collect::<Vec<_>>()
}
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+ async fn table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
let tables = self.tables.read().unwrap();
- tables.get(name).cloned()
+ Ok(tables.get(name).cloned())
}
fn table_exist(&self, name: &str) -> bool {
diff --git a/datafusion/core/src/catalog/information_schema.rs
b/datafusion/core/src/catalog/information_schema.rs
index 80ce3b1ae4..cd8f764953 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -20,6 +20,7 @@
//! [Information Schema]: https://en.wikipedia.org/wiki/Information_schema
use async_trait::async_trait;
+use datafusion_common::DataFusionError;
use std::{any::Any, sync::Arc};
use arrow::{
@@ -78,7 +79,10 @@ struct InformationSchemaConfig {
impl InformationSchemaConfig {
/// Construct the `information_schema.tables` virtual table
- async fn make_tables(&self, builder: &mut InformationSchemaTablesBuilder) {
+ async fn make_tables(
+ &self,
+ builder: &mut InformationSchemaTablesBuilder,
+ ) -> Result<(), DataFusionError> {
// create a mem table with the names of tables
for catalog_name in self.catalog_list.catalog_names() {
@@ -89,7 +93,7 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to
check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
- if let Some(table) =
schema.table(&table_name).await {
+ if let Some(table) =
schema.table(&table_name).await? {
builder.add_table(
&catalog_name,
&schema_name,
@@ -124,6 +128,8 @@ impl InformationSchemaConfig {
TableType::View,
);
}
+
+ Ok(())
}
async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
@@ -141,7 +147,10 @@ impl InformationSchemaConfig {
}
}
- async fn make_views(&self, builder: &mut InformationSchemaViewBuilder) {
+ async fn make_views(
+ &self,
+ builder: &mut InformationSchemaViewBuilder,
+ ) -> Result<(), DataFusionError> {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
@@ -150,7 +159,7 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to
check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
- if let Some(table) =
schema.table(&table_name).await {
+ if let Some(table) =
schema.table(&table_name).await? {
builder.add_view(
&catalog_name,
&schema_name,
@@ -163,10 +172,15 @@ impl InformationSchemaConfig {
}
}
}
+
+ Ok(())
}
/// Construct the `information_schema.columns` virtual table
- async fn make_columns(&self, builder: &mut
InformationSchemaColumnsBuilder) {
+ async fn make_columns(
+ &self,
+ builder: &mut InformationSchemaColumnsBuilder,
+ ) -> Result<(), DataFusionError> {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();
@@ -175,7 +189,7 @@ impl InformationSchemaConfig {
// schema name may not exist in the catalog, so we need to
check
if let Some(schema) = catalog.schema(&schema_name) {
for table_name in schema.table_names() {
- if let Some(table) =
schema.table(&table_name).await {
+ if let Some(table) =
schema.table(&table_name).await? {
for (field_position, field) in
table.schema().fields().iter().enumerate()
{
@@ -193,6 +207,8 @@ impl InformationSchemaConfig {
}
}
}
+
+ Ok(())
}
/// Construct the `information_schema.df_settings` virtual table
@@ -223,7 +239,10 @@ impl SchemaProvider for InformationSchemaProvider {
]
}
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
let config = self.config.clone();
let table: Arc<dyn PartitionStream> = if
name.eq_ignore_ascii_case("tables") {
Arc::new(InformationSchemaTables::new(config))
@@ -236,12 +255,12 @@ impl SchemaProvider for InformationSchemaProvider {
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
- return None;
+ return Ok(None);
};
- Some(Arc::new(
+ Ok(Some(Arc::new(
StreamingTable::try_new(table.schema().clone(),
vec![table]).unwrap(),
- ))
+ )))
}
fn table_exist(&self, name: &str) -> bool {
@@ -292,7 +311,7 @@ impl PartitionStream for InformationSchemaTables {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
- config.make_tables(&mut builder).await;
+ config.make_tables(&mut builder).await?;
Ok(builder.finish())
}),
))
@@ -383,7 +402,7 @@ impl PartitionStream for InformationSchemaViews {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
- config.make_views(&mut builder).await;
+ config.make_views(&mut builder).await?;
Ok(builder.finish())
}),
))
@@ -497,7 +516,7 @@ impl PartitionStream for InformationSchemaColumns {
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
- config.make_columns(&mut builder).await;
+ config.make_columns(&mut builder).await?;
Ok(builder.finish())
}),
))
diff --git a/datafusion/core/src/catalog/listing_schema.rs
b/datafusion/core/src/catalog/listing_schema.rs
index c3c6826895..f64b43062d 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -175,12 +175,16 @@ impl SchemaProvider for ListingSchemaProvider {
.collect()
}
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
- self.tables
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+ Ok(self
+ .tables
.lock()
.expect("Can't lock tables")
.get(name)
- .cloned()
+ .cloned())
}
fn register_table(
diff --git a/datafusion/core/src/catalog/schema.rs
b/datafusion/core/src/catalog/schema.rs
index 1e9a86b496..49f8350ecc 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -49,7 +49,10 @@ pub trait SchemaProvider: Sync + Send {
/// Retrieves a specific table from the schema by name, if it exists,
/// otherwise returns `None`.
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError>;
/// If supported by the implementation, adds a new table named `name` to
/// this schema.
@@ -111,8 +114,11 @@ impl SchemaProvider for MemorySchemaProvider {
.collect()
}
- async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
- self.tables.get(name).map(|table| table.value().clone())
+ async fn table(
+ &self,
+ name: &str,
+ ) -> Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
+ Ok(self.tables.get(name).map(|table| table.value().clone()))
}
fn register_table(
diff --git a/datafusion/core/src/execution/context/mod.rs
b/datafusion/core/src/execution/context/mod.rs
index b130070141..ffc4a4f717 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -783,7 +783,7 @@ impl SessionContext {
};
if let Some(schema) = maybe_schema {
- if let Some(table_provider) = schema.table(&table).await {
+ if let Some(table_provider) = schema.table(&table).await? {
if table_provider.table_type() == table_type {
schema.deregister_table(&table)?;
return Ok(true);
@@ -1115,7 +1115,7 @@ impl SessionContext {
let table_ref = table_ref.into();
let table = table_ref.table().to_string();
let schema = self.state.read().schema_for_ref(table_ref)?;
- match schema.table(&table).await {
+ match schema.table(&table).await? {
Some(ref provider) => Ok(Arc::clone(provider)),
_ => plan_err!("No table named '{table}'"),
}
@@ -1714,7 +1714,7 @@ impl SessionState {
let resolved = self.resolve_table_ref(&reference);
if let Entry::Vacant(v) =
provider.tables.entry(resolved.to_string()) {
if let Ok(schema) = self.schema_for_ref(resolved) {
- if let Some(table) = schema.table(table).await {
+ if let Some(table) = schema.table(table).await? {
v.insert(provider_as_source(table));
}
}
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 23ac7e08ca..83ba773464 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -624,7 +624,7 @@ impl DefaultPhysicalPlanner {
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name)?;
- if let Some(provider) = schema.table(name).await {
+ if let Some(provider) = schema.table(name).await? {
let input_exec = self.create_initial_plan(input,
session_state).await?;
provider.insert_into(session_state, input_exec,
false).await
} else {
@@ -641,7 +641,7 @@ impl DefaultPhysicalPlanner {
}) => {
let name = table_name.table();
let schema = session_state.schema_for_ref(table_name)?;
- if let Some(provider) = schema.table(name).await {
+ if let Some(provider) = schema.table(name).await? {
let input_exec = self.create_initial_plan(input,
session_state).await?;
provider.insert_into(session_state, input_exec,
true).await
} else {
diff --git a/datafusion/core/tests/sql/create_drop.rs
b/datafusion/core/tests/sql/create_drop.rs
index b1434dddee..2174009b85 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -63,7 +63,7 @@ async fn create_external_table_with_ddl() -> Result<()> {
let exists = schema.table_exist("dt");
assert!(exists, "Table should have been created!");
- let table_schema = schema.table("dt").await.unwrap().schema();
+ let table_schema = schema.table("dt").await.unwrap().unwrap().schema();
assert_eq!(3, table_schema.fields().len());