This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bf5811  [datafusion-1269]: forbid creating the table with the same 
name (#1288)
3bf5811 is described below

commit 3bf5811b9a275b7ce6792867a5c1fa230a880742
Author: Kun Liu <[email protected]>
AuthorDate: Mon Nov 15 19:54:19 2021 +0800

    [datafusion-1269]: forbid creating the table with the same name (#1288)
    
    * create table we should check the schema provider
    
    * fix lint error
    
    * fix comments
    
    * fix test
    
    * change the table exists method
    
    * change test table name
    
    * address comments
    
    * address comments
---
 datafusion/src/catalog/information_schema.rs |  4 +++
 datafusion/src/catalog/schema.rs             | 52 ++++++++++++++++++++++++++--
 datafusion/src/execution/context.rs          |  1 +
 3 files changed, 54 insertions(+), 3 deletions(-)

diff --git a/datafusion/src/catalog/information_schema.rs 
b/datafusion/src/catalog/information_schema.rs
index cd1e612..ba4ec09 100644
--- a/datafusion/src/catalog/information_schema.rs
+++ b/datafusion/src/catalog/information_schema.rs
@@ -187,6 +187,10 @@ impl SchemaProvider for InformationSchemaProvider {
             None
         }
     }
+
+    fn table_exist(&self, name: &str) -> bool {
+        return matches!(name.to_ascii_lowercase().as_str(), TABLES | COLUMNS);
+    }
 }
 
 /// Builds the `information_schema.TABLE` table row by row
diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs
index 0e39546..cf754f6 100644
--- a/datafusion/src/catalog/schema.rs
+++ b/datafusion/src/catalog/schema.rs
@@ -18,12 +18,13 @@
 //! Describes the interface and built-in implementations of schemas,
 //! representing collections of named tables.
 
-use crate::datasource::TableProvider;
-use crate::error::{DataFusionError, Result};
 use std::any::Any;
 use std::collections::HashMap;
 use std::sync::{Arc, RwLock};
 
+use crate::datasource::TableProvider;
+use crate::error::{DataFusionError, Result};
+
 /// Represents a schema, comprising a number of named tables.
 pub trait SchemaProvider: Sync + Send {
     /// Returns the schema provider as [`Any`](std::any::Any)
@@ -37,7 +38,7 @@ pub trait SchemaProvider: Sync + Send {
     fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>>;
 
     /// If supported by the implementation, adds a new table to this schema.
-    /// If a table of the same name existed before, it is replaced in the 
schema and returned.
+    /// If a table of the same name existed before, it returns "Table already 
exists" error.
     #[allow(unused_variables)]
     fn register_table(
         &self,
@@ -57,6 +58,11 @@ pub trait SchemaProvider: Sync + Send {
             "schema provider does not support deregistering tables".to_owned(),
         ))
     }
+
+    /// If supported by the implementation, checks the table exist in the 
schema provider or not.
+    /// If no matched table in the schema provider, return false.
+    /// Otherwise, return true.
+    fn table_exist(&self, name: &str) -> bool;
 }
 
 /// Simple in-memory implementation of a schema.
@@ -93,6 +99,12 @@ impl SchemaProvider for MemorySchemaProvider {
         name: String,
         table: Arc<dyn TableProvider>,
     ) -> Result<Option<Arc<dyn TableProvider>>> {
+        if self.table_exist(name.as_str()) {
+            return Err(DataFusionError::Execution(format!(
+                "The table {} already exists",
+                name
+            )));
+        }
         let mut tables = self.tables.write().unwrap();
         Ok(tables.insert(name, table))
     }
@@ -101,4 +113,38 @@ impl SchemaProvider for MemorySchemaProvider {
         let mut tables = self.tables.write().unwrap();
         Ok(tables.remove(name))
     }
+
+    fn table_exist(&self, name: &str) -> bool {
+        let tables = self.tables.read().unwrap();
+        tables.contains_key(name)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use arrow::datatypes::Schema;
+
+    use crate::catalog::schema::{MemorySchemaProvider, SchemaProvider};
+    use crate::datasource::empty::EmptyTable;
+
+    #[tokio::test]
+    async fn test_mem_provider() {
+        let provider = MemorySchemaProvider::new();
+        let table_name = "test_table_exist";
+        assert!(!provider.table_exist(table_name));
+        assert!(provider.deregister_table(table_name).unwrap().is_none());
+        let test_table = EmptyTable::new(Arc::new(Schema::empty()));
+        // register table successfully
+        assert!(provider
+            .register_table(table_name.to_string(), Arc::new(test_table))
+            .unwrap()
+            .is_none());
+        assert!(provider.table_exist(table_name));
+        let other_table = EmptyTable::new(Arc::new(Schema::empty()));
+        let result =
+            provider.register_table(table_name.to_string(), 
Arc::new(other_table));
+        assert!(result.is_err());
+    }
 }
diff --git a/datafusion/src/execution/context.rs 
b/datafusion/src/execution/context.rs
index 78a7884..0baed7c 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -2841,6 +2841,7 @@ mod tests {
             let batch =
                 RecordBatch::try_new(schema.clone(), 
vec![array.clone()]).unwrap();
             let provider = MemTable::try_new(schema, 
vec![vec![batch]]).unwrap();
+            ctx.deregister_table("t").unwrap();
             ctx.register_table("t", Arc::new(provider)).unwrap();
             let expected = vec![
                 "+-----------+",

Reply via email to