This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6132f79  Enhance MemorySchemaProvider to support 
`register_listing_table` (#1863)
6132f79 is described below

commit 6132f79e38b3019d72f6c1655ffafed59a1c0d03
Author: Matthew Turner <[email protected]>
AuthorDate: Fri Mar 4 14:55:47 2022 -0500

    Enhance MemorySchemaProvider to support `register_listing_table` (#1863)
    
    * Add methods to MemorySchemaProvider
    
    * Register object store and listing table
    
    * Small fixes
    
    * Fix test
    
    * Duplicate table test
    
    * Cleanup docs
    
    * Add test with list_file
    
    * clippy
    
    * Create ObjectStoreSchemaProvider and fix test
---
 datafusion/src/catalog/schema.rs | 222 ++++++++++++++++++++++++++++++++++++++-
 1 file changed, 220 insertions(+), 2 deletions(-)

diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs
index 60894d1..a97590a 100644
--- a/datafusion/src/catalog/schema.rs
+++ b/datafusion/src/catalog/schema.rs
@@ -18,11 +18,13 @@
 //! Describes the interface and built-in implementations of schemas,
 //! representing collections of named tables.
 
-use parking_lot::RwLock;
+use parking_lot::{Mutex, RwLock};
 use std::any::Any;
 use std::collections::HashMap;
 use std::sync::Arc;
 
+use crate::datasource::listing::{ListingTable, ListingTableConfig};
+use crate::datasource::object_store::{ObjectStore, ObjectStoreRegistry};
 use crate::datasource::TableProvider;
 use crate::error::{DataFusionError, Result};
 
@@ -127,14 +129,131 @@ impl SchemaProvider for MemorySchemaProvider {
     }
 }
 
+/// `ObjectStore` implementation of `SchemaProvider` to enable registering a 
`ListingTable`
+pub struct ObjectStoreSchemaProvider {
+    tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
+    object_store_registry: Arc<Mutex<ObjectStoreRegistry>>,
+}
+
+impl ObjectStoreSchemaProvider {
+    /// Instantiates a new `ObjectStoreSchemaProvider` with an empty 
collection of tables.
+    pub fn new() -> Self {
+        Self {
+            tables: RwLock::new(HashMap::new()),
+            object_store_registry: 
Arc::new(Mutex::new(ObjectStoreRegistry::new())),
+        }
+    }
+
+    /// Assign an `ObjectStore` which enables calling `register_listing_table`.
+    pub fn register_object_store(
+        &self,
+        scheme: impl Into<String>,
+        object_store: Arc<dyn ObjectStore>,
+    ) -> Option<Arc<dyn ObjectStore>> {
+        self.object_store_registry
+            .lock()
+            .register_store(scheme.into(), object_store)
+    }
+
+    /// Retrieves a `ObjectStore` instance by scheme
+    pub fn object_store<'a>(
+        &self,
+        uri: &'a str,
+    ) -> Result<(Arc<dyn ObjectStore>, &'a str)> {
+        self.object_store_registry
+            .lock()
+            .get_by_uri(uri)
+            .map_err(DataFusionError::from)
+    }
+
+    /// If supported by the implementation, adds a new table to this schema by 
creating a
+    /// `ListingTable` from the provided `uri` and a previously registered 
`ObjectStore`.
+    /// If a table of the same name existed before, it returns "Table already 
exists" error.
+    pub async fn register_listing_table(
+        &self,
+        name: &str,
+        uri: &str,
+        config: Option<ListingTableConfig>,
+    ) -> Result<()> {
+        let config = match config {
+            Some(cfg) => cfg,
+            None => {
+                let (object_store, _path) = self.object_store(uri)?;
+                ListingTableConfig::new(object_store, uri).infer().await?
+            }
+        };
+
+        let table = Arc::new(ListingTable::try_new(config)?);
+        self.register_table(name.into(), table)?;
+        Ok(())
+    }
+}
+
+impl Default for ObjectStoreSchemaProvider {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SchemaProvider for ObjectStoreSchemaProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        let tables = self.tables.read();
+        tables.keys().cloned().collect()
+    }
+
+    fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+        let tables = self.tables.read();
+        tables.get(name).cloned()
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        if self.table_exist(name.as_str()) {
+            return Err(DataFusionError::Execution(format!(
+                "The table {} already exists",
+                name
+            )));
+        }
+        let mut tables = self.tables.write();
+        Ok(tables.insert(name, table))
+    }
+
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn 
TableProvider>>> {
+        let mut tables = self.tables.write();
+        Ok(tables.remove(name))
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        let tables = self.tables.read();
+        tables.contains_key(name)
+    }
+}
+
 #[cfg(test)]
 mod tests {
+    use std::ffi::OsStr;
+    use std::path::Path;
     use std::sync::Arc;
 
     use arrow::datatypes::Schema;
 
-    use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
+    use crate::assert_batches_eq;
+    use crate::catalog::catalog::MemoryCatalogProvider;
+    use crate::catalog::schema::{
+        MemorySchemaProvider, ObjectStoreSchemaProvider, SchemaProvider,
+    };
     use crate::datasource::empty::EmptyTable;
+    use crate::datasource::object_store::local::LocalFileSystem;
+    use crate::execution::context::ExecutionContext;
+
+    use futures::StreamExt;
 
     #[tokio::test]
     async fn test_mem_provider() {
@@ -154,4 +273,103 @@ mod tests {
             provider.register_table(table_name.to_string(), 
Arc::new(other_table));
         assert!(result.is_err());
     }
+
+    #[tokio::test]
+    async fn test_schema_register_listing_table() {
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+
+        let schema = ObjectStoreSchemaProvider::new();
+        let _store = schema.register_object_store("test", 
Arc::new(LocalFileSystem {}));
+
+        schema
+            .register_listing_table("alltypes_plain", &filename, None)
+            .await
+            .unwrap();
+
+        let catalog = MemoryCatalogProvider::new();
+        catalog.register_schema("active", Arc::new(schema));
+
+        let mut ctx = ExecutionContext::new();
+
+        ctx.register_catalog("cat", Arc::new(catalog));
+
+        let df = ctx
+            .sql("SELECT id, bool_col FROM cat.active.alltypes_plain")
+            .await
+            .unwrap();
+
+        let actual = df.collect().await.unwrap();
+
+        let expected = vec![
+            "+----+----------+",
+            "| id | bool_col |",
+            "+----+----------+",
+            "| 4  | true     |",
+            "| 5  | false    |",
+            "| 6  | true     |",
+            "| 7  | false    |",
+            "| 2  | true     |",
+            "| 3  | false    |",
+            "| 0  | true     |",
+            "| 1  | false    |",
+            "+----+----------+",
+        ];
+        assert_batches_eq!(expected, &actual);
+    }
+
+    #[tokio::test]
+    async fn test_schema_register_listing_tables() {
+        let testdata = crate::test_util::parquet_test_data();
+
+        let schema = ObjectStoreSchemaProvider::new();
+        let store = schema
+            .register_object_store("file", Arc::new(LocalFileSystem {}))
+            .unwrap();
+
+        let mut files = store.list_file(&testdata).await.unwrap();
+        while let Some(file) = files.next().await {
+            let sized_file = file.unwrap().sized_file;
+            let path = Path::new(&sized_file.path);
+            let file = path.file_name().unwrap();
+            if file == OsStr::new("alltypes_dictionary.parquet")
+                || file == OsStr::new("alltypes_plain.parquet")
+            {
+                let name = path.file_stem().unwrap().to_str().unwrap();
+                schema
+                    .register_listing_table(name, &sized_file.path, None)
+                    .await
+                    .unwrap();
+            }
+        }
+
+        let tables = vec![
+            String::from("alltypes_dictionary"),
+            String::from("alltypes_plain"),
+        ];
+
+        let mut schema_tables = schema.table_names();
+        schema_tables.sort();
+        assert_eq!(schema_tables, tables);
+    }
+
+    #[tokio::test]
+    #[should_panic(expected = "already exists")]
+    async fn test_schema_register_same_listing_table() {
+        let testdata = crate::test_util::parquet_test_data();
+        let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
+
+        let schema = ObjectStoreSchemaProvider::new();
+        let _store = schema.register_object_store("test", 
Arc::new(LocalFileSystem {}));
+
+        schema
+            .register_listing_table("alltypes_plain", &filename, None)
+            .await
+            .unwrap();
+
+        schema
+            .register_listing_table("alltypes_plain", &filename, None)
+            .await
+            .unwrap();
+    }
 }

Reply via email to