This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6132f79 Enhance MemorySchemaProvider to support
`register_listing_table` (#1863)
6132f79 is described below
commit 6132f79e38b3019d72f6c1655ffafed59a1c0d03
Author: Matthew Turner <[email protected]>
AuthorDate: Fri Mar 4 14:55:47 2022 -0500
Enhance MemorySchemaProvider to support `register_listing_table` (#1863)
* Add methods to MemorySchemaProvider
* Register object store and listing table
* Small fixes
* Fix test
* Duplicate table test
* Cleanup docs
* Add test with list_file
* clippy
* Create ObjectStoreSchemaProvider and fix test
---
datafusion/src/catalog/schema.rs | 222 ++++++++++++++++++++++++++++++++++++++-
1 file changed, 220 insertions(+), 2 deletions(-)
diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs
index 60894d1..a97590a 100644
--- a/datafusion/src/catalog/schema.rs
+++ b/datafusion/src/catalog/schema.rs
@@ -18,11 +18,13 @@
//! Describes the interface and built-in implementations of schemas,
//! representing collections of named tables.
-use parking_lot::RwLock;
+use parking_lot::{Mutex, RwLock};
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
+use crate::datasource::listing::{ListingTable, ListingTableConfig};
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
@@ -127,14 +129,131 @@ impl SchemaProvider for MemorySchemaProvider {
}
}
+/// `ObjectStore` implementation of `SchemaProvider` to enable registering a
`ListingTable`
+pub struct ObjectStoreSchemaProvider {
+ tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
+ object_store_registry: Arc<Mutex<ObjectStoreRegistry>>,
+}
+
+impl ObjectStoreSchemaProvider {
+ /// Instantiates a new `ObjectStoreSchemaProvider` with an empty
collection of tables.
+ pub fn new() -> Self {
+ Self {
+ tables: RwLock::new(HashMap::new()),
+ object_store_registry:
Arc::new(Mutex::new(ObjectStoreRegistry::new())),
+ }
+ }
+
+ /// Assign an `ObjectStore` which enables calling `register_listing_table`.
+ pub fn register_object_store(
+ &self,
+ scheme: impl Into<String>,
+ object_store: Arc<dyn ObjectStore>,
+ ) -> Option<Arc<dyn ObjectStore>> {
+ self.object_store_registry
+ .lock()
+ .register_store(scheme.into(), object_store)
+ }
+
+ /// Retrieves a `ObjectStore` instance by scheme
+ pub fn object_store<'a>(
+ &self,
+ uri: &'a str,
+ ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+ self.object_store_registry
+ .lock()
+ .get_by_uri(uri)
+ .map_err(DataFusionError::from)
+ }
+
+ /// If supported by the implementation, adds a new table to this schema by
creating a
+ /// `ListingTable` from the provided `uri` and a previously registered
`ObjectStore`.
+ /// If a table of the same name existed before, it returns "Table already
exists" error.
+ pub async fn register_listing_table(
+ &self,
+ name: &str,
+ uri: &str,
+ config: Option<ListingTableConfig>,
+ ) -> Result<()> {
+ let config = match config {
+ Some(cfg) => cfg,
+ None => {
+ let (object_store, _path) = self.object_store(uri)?;
+ ListingTableConfig::new(object_store, uri).infer().await?
+ }
+ };
+
+ let table = Arc::new(ListingTable::try_new(config)?);
+ self.register_table(name.into(), table)?;
+ Ok(())
+ }
+}
+
+impl Default for ObjectStoreSchemaProvider {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SchemaProvider for ObjectStoreSchemaProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn table_names(&self) -> Vec<String> {
+ let tables = self.tables.read();
+ tables.keys().cloned().collect()
+ }
+
+ fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+ let tables = self.tables.read();
+ tables.get(name).cloned()
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> Result<Option<Arc<dyn TableProvider>>> {
+ if self.table_exist(name.as_str()) {
+ return Err(DataFusionError::Execution(format!(
+ "The table {} already exists",
+ name
+ )));
+ }
+ let mut tables = self.tables.write();
+ Ok(tables.insert(name, table))
+ }
+
+ fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn
TableProvider>>> {
+ let mut tables = self.tables.write();
+ Ok(tables.remove(name))
+ }
+
+ fn table_exist(&self, name: &str) -> bool {
+ let tables = self.tables.read();
+ tables.contains_key(name)
+ }
+}
+
#[cfg(test)]
mod tests {
+ use std::ffi::OsStr;
+ use std::path::Path;
use std::sync::Arc;
use arrow::datatypes::Schema;
- use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
+ use crate::assert_batches_eq;
+ use crate::catalog::catalog::MemoryCatalogProvider;
+ use crate::catalog::schema::{
+ MemorySchemaProvider, ObjectStoreSchemaProvider, SchemaProvider,
+ };
use crate::datasource::empty::EmptyTable;
+ use crate::datasource::object_store::local::LocalFileSystem;
+ use crate::execution::context::ExecutionContext;
+
+ use futures::StreamExt;
#[tokio::test]
async fn test_mem_provider() {
@@ -154,4 +273,103 @@ mod tests {
provider.register_table(table_name.to_string(),
Arc::new(other_table));
assert!(result.is_err());
}
+
+ #[tokio::test]
+ async fn test_schema_register_listing_table() {
+ let testdata = crate::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+
+ let schema = ObjectStoreSchemaProvider::new();
+ let _store = schema.register_object_store("test",
Arc::new(LocalFileSystem {}));
+
+ schema
+ .register_listing_table("alltypes_plain", &filename, None)
+ .await
+ .unwrap();
+
+ let catalog = MemoryCatalogProvider::new();
+ catalog.register_schema("active", Arc::new(schema));
+
+ let mut ctx = ExecutionContext::new();
+
+ ctx.register_catalog("cat", Arc::new(catalog));
+
+ let df = ctx
+ .sql("SELECT id, bool_col FROM cat.active.alltypes_plain")
+ .await
+ .unwrap();
+
+ let actual = df.collect().await.unwrap();
+
+ let expected = vec![
+ "+----+----------+",
+ "| id | bool_col |",
+ "+----+----------+",
+ "| 4 | true |",
+ "| 5 | false |",
+ "| 6 | true |",
+ "| 7 | false |",
+ "| 2 | true |",
+ "| 3 | false |",
+ "| 0 | true |",
+ "| 1 | false |",
+ "+----+----------+",
+ ];
+ assert_batches_eq!(expected, &actual);
+ }
+
+ #[tokio::test]
+ async fn test_schema_register_listing_tables() {
+ let testdata = crate::test_util::parquet_test_data();
+
+ let schema = ObjectStoreSchemaProvider::new();
+ let store = schema
+ .register_object_store("file", Arc::new(LocalFileSystem {}))
+ .unwrap();
+
+ let mut files = store.list_file(&testdata).await.unwrap();
+ while let Some(file) = files.next().await {
+ let sized_file = file.unwrap().sized_file;
+ let path = Path::new(&sized_file.path);
+ let file = path.file_name().unwrap();
+ if file == OsStr::new("alltypes_dictionary.parquet")
+ || file == OsStr::new("alltypes_plain.parquet")
+ {
+ let name = path.file_stem().unwrap().to_str().unwrap();
+ schema
+ .register_listing_table(name, &sized_file.path, None)
+ .await
+ .unwrap();
+ }
+ }
+
+ let tables = vec![
+ String::from("alltypes_dictionary"),
+ String::from("alltypes_plain"),
+ ];
+
+ let mut schema_tables = schema.table_names();
+ schema_tables.sort();
+ assert_eq!(schema_tables, tables);
+ }
+
+ #[tokio::test]
+ #[should_panic(expected = "already exists")]
+ async fn test_schema_register_same_listing_table() {
+ let testdata = crate::test_util::parquet_test_data();
+ let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+
+ let schema = ObjectStoreSchemaProvider::new();
+ let _store = schema.register_object_store("test",
Arc::new(LocalFileSystem {}));
+
+ schema
+ .register_listing_table("alltypes_plain", &filename, None)
+ .await
+ .unwrap();
+
+ schema
+ .register_listing_table("alltypes_plain", &filename, None)
+ .await
+ .unwrap();
+ }
}