This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 3ec0d55 Add Create Schema functionality in SQL (#1959)
3ec0d55 is described below
commit 3ec0d55c736aecef5051311e639836aecbde7f51
Author: Matthew Turner <[email protected]>
AuthorDate: Thu Mar 17 08:39:55 2022 -0400
Add Create Schema functionality in SQL (#1959)
* Start adding Create Schema
* Refactor catalog and schema providers
* Working on catalog and schema registration within ExecutionContext
* Create schema SQL working and added test
* Fix tests
* Ballista updates
* Rebase remainder
---
ballista/rust/core/proto/ballista.proto | 7 ++
ballista/rust/core/src/serde/logical_plan/mod.rs | 30 +++++++-
datafusion/src/catalog/catalog.rs | 28 +++++---
datafusion/src/catalog/information_schema.rs | 9 +++
datafusion/src/catalog/mod.rs | 16 ++++-
datafusion/src/catalog/schema.rs | 1 +
datafusion/src/execution/context.rs | 79 +++++++++++++++++++---
datafusion/src/logical_plan/mod.rs | 6 +-
datafusion/src/logical_plan/plan.rs | 28 +++++++-
.../src/optimizer/common_subexpr_eliminate.rs | 1 +
datafusion/src/optimizer/projection_push_down.rs | 1 +
datafusion/src/optimizer/utils.rs | 3 +-
datafusion/src/physical_plan/planner.rs | 9 +++
datafusion/src/sql/planner.rs | 15 ++--
datafusion/tests/sql/information_schema.rs | 2 +-
15 files changed, 203 insertions(+), 32 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index a835229..5bb1289 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -49,6 +49,7 @@ message LogicalPlanNode {
CrossJoinNode cross_join = 15;
ValuesNode values = 16;
LogicalExtensionNode extension = 17;
+ CreateCatalogSchemaNode create_catalog_schema = 18;
}
}
@@ -146,6 +147,12 @@ message CreateExternalTableNode {
datafusion.DfSchema schema = 5;
}
+message CreateCatalogSchemaNode {
+ string schema_name = 1;
+ bool if_not_exists = 2;
+ datafusion.DfSchema schema = 3;
+}
+
// a node containing data for defining values list. unlike in SQL where it's
two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and
partitioned into rows
message ValuesNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 7379793..bfc254a 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, Window,
};
use datafusion::logical_plan::{
- Column, CreateExternalTable, CrossJoin, Expr, JoinConstraint, Limit,
LogicalPlan,
- LogicalPlanBuilder, Repartition, TableScan, Values,
+ Column, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
JoinConstraint,
+ Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan, Values,
};
use datafusion::prelude::SessionContext;
@@ -322,6 +322,19 @@ impl AsLogicalPlan for LogicalPlanNode {
has_header: create_extern_table.has_header,
}))
}
+ LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
+ let pb_schema =
(create_catalog_schema.schema.clone()).ok_or_else(|| {
+ BallistaError::General(String::from(
+ "Protobuf deserialization error,
CreateCatalogSchemaNode was missing required field schema.",
+ ))
+ })?;
+
+ Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
+ schema_name: create_catalog_schema.schema_name.clone(),
+ if_not_exists: create_catalog_schema.if_not_exists,
+ schema: pb_schema.try_into()?,
+ }))
+ }
LogicalPlanType::Analyze(analyze) => {
let input: LogicalPlan =
into_logical_plan!(analyze.input, &ctx, extension_codec)?;
@@ -755,6 +768,19 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
}
+ LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
+ schema_name,
+ if_not_exists,
+ schema: df_schema,
+ }) => Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::CreateCatalogSchema(
+ protobuf::CreateCatalogSchemaNode {
+ schema_name: schema_name.clone(),
+ if_not_exists: *if_not_exists,
+ schema: Some(df_schema.into()),
+ },
+ )),
+ }),
LogicalPlan::Analyze(a) => {
let input = protobuf::LogicalPlanNode::try_from_logical_plan(
a.input.as_ref(),
diff --git a/datafusion/src/catalog/catalog.rs
b/datafusion/src/catalog/catalog.rs
index d5f509f..35054dc 100644
--- a/datafusion/src/catalog/catalog.rs
+++ b/datafusion/src/catalog/catalog.rs
@@ -108,6 +108,14 @@ pub trait CatalogProvider: Sync + Send {
/// Retrieves a specific schema from the catalog by name, provided it
exists.
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
+
+ /// Adds a new schema to this catalog.
+ /// If a schema of the same name existed before, it is replaced in the
catalog and returned.
+ fn register_schema(
+ &self,
+ name: &str,
+ schema: Arc<dyn SchemaProvider>,
+ ) -> Option<Arc<dyn SchemaProvider>>;
}
/// Simple in-memory implementation of a catalog.
@@ -122,17 +130,6 @@ impl MemoryCatalogProvider {
schemas: RwLock::new(HashMap::new()),
}
}
-
- /// Adds a new schema to this catalog.
- /// If a schema of the same name existed before, it is replaced in the
catalog and returned.
- pub fn register_schema(
- &self,
- name: impl Into<String>,
- schema: Arc<dyn SchemaProvider>,
- ) -> Option<Arc<dyn SchemaProvider>> {
- let mut schemas = self.schemas.write();
- schemas.insert(name.into(), schema)
- }
}
impl CatalogProvider for MemoryCatalogProvider {
@@ -149,4 +146,13 @@ impl CatalogProvider for MemoryCatalogProvider {
let schemas = self.schemas.read();
schemas.get(name).cloned()
}
+
+ fn register_schema(
+ &self,
+ name: &str,
+ schema: Arc<dyn SchemaProvider>,
+ ) -> Option<Arc<dyn SchemaProvider>> {
+ let mut schemas = self.schemas.write();
+ schemas.insert(name.into(), schema)
+ }
}
diff --git a/datafusion/src/catalog/information_schema.rs
b/datafusion/src/catalog/information_schema.rs
index 2fbf825..4a899c9 100644
--- a/datafusion/src/catalog/information_schema.rs
+++ b/datafusion/src/catalog/information_schema.rs
@@ -84,6 +84,15 @@ impl CatalogProvider for CatalogWithInformationSchema {
self.inner.schema(name)
}
}
+
+ fn register_schema(
+ &self,
+ name: &str,
+ schema: Arc<dyn SchemaProvider>,
+ ) -> Option<Arc<dyn SchemaProvider>> {
+ let catalog = &self.inner;
+ catalog.register_schema(name, schema)
+ }
}
/// Implements the `information_schema` virtual schema and tables
diff --git a/datafusion/src/catalog/mod.rs b/datafusion/src/catalog/mod.rs
index 478cdef..031cd87 100644
--- a/datafusion/src/catalog/mod.rs
+++ b/datafusion/src/catalog/mod.rs
@@ -105,7 +105,21 @@ impl<'a> TableReference<'a> {
impl<'a> From<&'a str> for TableReference<'a> {
fn from(s: &'a str) -> Self {
- Self::Bare { table: s }
+ let parts: Vec<&str> = s.split('.').collect();
+
+ match parts.len() {
+ 1 => Self::Bare { table: s },
+ 2 => Self::Partial {
+ schema: parts[0],
+ table: parts[1],
+ },
+ 3 => Self::Full {
+ catalog: parts[0],
+ schema: parts[1],
+ table: parts[2],
+ },
+ _ => Self::Bare { table: s },
+ }
}
}
diff --git a/datafusion/src/catalog/schema.rs b/datafusion/src/catalog/schema.rs
index f5e8b9a..ece61c8 100644
--- a/datafusion/src/catalog/schema.rs
+++ b/datafusion/src/catalog/schema.rs
@@ -245,6 +245,7 @@ mod tests {
use arrow::datatypes::Schema;
use crate::assert_batches_eq;
+ use crate::catalog::catalog::CatalogProvider;
use crate::catalog::catalog::MemoryCatalogProvider;
use crate::catalog::schema::{
MemorySchemaProvider, ObjectStoreSchemaProvider, SchemaProvider,
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index 1361637..7d8c444 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -59,8 +59,8 @@ use crate::datasource::object_store::{ObjectStore,
ObjectStoreRegistry};
use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
- CreateExternalTable, CreateMemoryTable, DropTable, FunctionRegistry,
LogicalPlan,
- LogicalPlanBuilder, UNNAMED_TABLE,
+ CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, DropTable,
+ FunctionRegistry, LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::optimizer::filter_push_down::FilterPushDown;
@@ -169,7 +169,7 @@ impl SessionContext {
let default_catalog = MemoryCatalogProvider::new();
default_catalog.register_schema(
- config.default_schema.clone(),
+ config.default_schema.as_str(),
Arc::new(MemorySchemaProvider::new()),
);
@@ -256,7 +256,6 @@ impl SessionContext {
} else {
Some(Arc::new(schema.as_ref().to_owned().into()))
};
-
self.register_listing_table(name, location, options,
provided_schema)
.await?;
let plan = LogicalPlanBuilder::empty(false).build()?;
@@ -292,6 +291,39 @@ impl SessionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
}
+ LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
+ schema_name,
+ if_not_exists,
+ ..
+ }) => {
+ // sqlparser doesnt accept database / catalog as parameter to
CREATE SCHEMA
+ // so for now, we default to "datafusion" catalog
+ let default_catalog = "datafusion";
+ let catalog = self.catalog(default_catalog).ok_or_else(|| {
+ DataFusionError::Execution(String::from(
+ "Missing 'datafusion' catalog",
+ ))
+ })?;
+
+ let schema = catalog.schema(&schema_name);
+
+ match (if_not_exists, schema) {
+ (true, Some(_)) => {
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (true, None) | (false, None) => {
+ let schema = Arc::new(MemorySchemaProvider::new());
+ catalog.register_schema(&schema_name, schema);
+ let plan = LogicalPlanBuilder::empty(false).build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (false, Some(_)) => Err(DataFusionError::Execution(format!(
+ "Schema '{:?}' already exists",
+ schema_name
+ ))),
+ }
+ }
plan => Ok(Arc::new(DataFrame::new(
self.state.clone(),
@@ -541,10 +573,18 @@ impl SessionContext {
let state = self.state.lock();
let catalog = if state.config.information_schema {
- Arc::new(CatalogWithInformationSchema::new(
- Arc::downgrade(&state.catalog_list),
- catalog,
- ))
+ let is = state
+ .catalog_list
+ .catalog("datafusion")
+ .unwrap()
+ .schema("information_schema");
+ match is {
+ Some(_) => catalog,
+ None => Arc::new(CatalogWithInformationSchema::new(
+ Arc::downgrade(&state.catalog_list),
+ catalog,
+ )),
+ }
} else {
catalog
};
@@ -2994,6 +3034,29 @@ mod tests {
}
#[tokio::test]
+ async fn sql_create_schema() -> Result<()> {
+ // the information schema used to introduce cyclic Arcs
+ let mut ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_information_schema(true),
+ );
+
+ // Create schema
+ ctx.sql("CREATE SCHEMA abc").await?.collect().await?;
+
+ // Add table to schema
+ ctx.sql("CREATE TABLE abc.y AS VALUES (1,2,3)")
+ .await?
+ .collect()
+ .await?;
+
+ // Check table exists in schema
+ let results = ctx.sql("SELECT * FROM information_schema.tables WHERE
table_schema='abc' AND table_name =
'y'").await.unwrap().collect().await.unwrap();
+
+ assert_eq!(results[0].num_rows(), 1);
+ Ok(())
+ }
+
+ #[tokio::test]
async fn normalized_column_identifiers() {
// create local execution context
let mut ctx = SessionContext::new();
diff --git a/datafusion/src/logical_plan/mod.rs
b/datafusion/src/logical_plan/mod.rs
index 9173279..960deb2 100644
--- a/datafusion/src/logical_plan/mod.rs
+++ b/datafusion/src/logical_plan/mod.rs
@@ -63,9 +63,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor,
Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{
- CreateExternalTable, CreateMemoryTable, CrossJoin, DropTable,
EmptyRelation,
- JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
PlanVisitor,
- Repartition, TableScan, Union, Values,
+ CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CrossJoin,
DropTable,
+ EmptyRelation, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
PlanType,
+ PlanVisitor, Repartition, TableScan, Union, Values,
};
pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
diff --git a/datafusion/src/logical_plan/plan.rs
b/datafusion/src/logical_plan/plan.rs
index 60d4845..5fc91b6 100644
--- a/datafusion/src/logical_plan/plan.rs
+++ b/datafusion/src/logical_plan/plan.rs
@@ -185,6 +185,17 @@ pub struct CreateExternalTable {
pub has_header: bool,
}
+/// Creates a schema.
+#[derive(Clone)]
+pub struct CreateCatalogSchema {
+ /// The table schema
+ pub schema_name: String,
+ /// The table name
+ pub if_not_exists: bool,
+ /// Empty schema
+ pub schema: DFSchemaRef,
+}
+
/// Drops a table.
#[derive(Clone)]
pub struct DropTable {
@@ -346,6 +357,8 @@ pub enum LogicalPlan {
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
CreateMemoryTable(CreateMemoryTable),
+ /// Creates a new catalog schema.
+ CreateCatalogSchema(CreateCatalogSchema),
/// Drops a table.
DropTable(DropTable),
/// Values expression. See
@@ -390,6 +403,9 @@ impl LogicalPlan {
LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) =>
{
input.schema()
}
+ LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, ..
}) => {
+ schema
+ }
LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
}
}
@@ -431,7 +447,8 @@ impl LogicalPlan {
LogicalPlan::Explain(Explain { schema, .. })
| LogicalPlan::Analyze(Analyze { schema, .. })
| LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
- | LogicalPlan::CreateExternalTable(CreateExternalTable { schema,
.. }) => {
+ | LogicalPlan::CreateExternalTable(CreateExternalTable { schema,
.. })
+ | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema,
.. }) => {
vec![schema]
}
LogicalPlan::Limit(Limit { input, .. })
@@ -486,6 +503,7 @@ impl LogicalPlan {
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Analyze { .. }
@@ -521,6 +539,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
| LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_) => vec![],
}
}
@@ -673,6 +692,7 @@ impl LogicalPlan {
| LogicalPlan::EmptyRelation(_)
| LogicalPlan::Values(_)
| LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_) => true,
};
if !recurse {
@@ -1002,6 +1022,12 @@ impl LogicalPlan {
}) => {
write!(f, "CreateMemoryTable: {:?}", name)
}
+ LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
+ schema_name,
+ ..
+ }) => {
+ write!(f, "CreateCatalogSchema: {:?}", schema_name)
+ }
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
diff --git a/datafusion/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/src/optimizer/common_subexpr_eliminate.rs
index dad63c3..1c0cb73 100644
--- a/datafusion/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/src/optimizer/common_subexpr_eliminate.rs
@@ -221,6 +221,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::Extension { .. } => {
// apply the optimization to all inputs of the plan
diff --git a/datafusion/src/optimizer/projection_push_down.rs
b/datafusion/src/optimizer/projection_push_down.rs
index 7debb7a..0a0e1c0 100644
--- a/datafusion/src/optimizer/projection_push_down.rs
+++ b/datafusion/src/optimizer/projection_push_down.rs
@@ -442,6 +442,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::DropTable(_)
| LogicalPlan::CrossJoin(_)
| LogicalPlan::Extension { .. } => {
diff --git a/datafusion/src/optimizer/utils.rs
b/datafusion/src/optimizer/utils.rs
index 68e25cd..121f876 100644
--- a/datafusion/src/optimizer/utils.rs
+++ b/datafusion/src/optimizer/utils.rs
@@ -267,7 +267,8 @@ pub fn from_plan(
LogicalPlan::EmptyRelation(_)
| LogicalPlan::TableScan { .. }
| LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::DropTable(_) => {
+ | LogicalPlan::DropTable(_)
+ | LogicalPlan::CreateCatalogSchema(_) => {
// All of these plan types have no inputs / exprs so should not be
called
assert!(expr.is_empty(), "{:?} should have no exprs", plan);
assert!(inputs.is_empty(), "{:?} should have no inputs", plan);
diff --git a/datafusion/src/physical_plan/planner.rs
b/datafusion/src/physical_plan/planner.rs
index 37b3034..afe4160 100644
--- a/datafusion/src/physical_plan/planner.rs
+++ b/datafusion/src/physical_plan/planner.rs
@@ -806,6 +806,15 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan:
CreateExternalTable".to_string(),
))
}
+ LogicalPlan::CreateCatalogSchema(_) => {
+ // There is no default plan for "CREATE SCHEMA".
+ // It must be handled at a higher level (so
+ // that the schema can be registered with
+ // the context)
+ Err(DataFusionError::Internal(
+ "Unsupported logical plan:
CreateCatalogSchema".to_string(),
+ ))
+ }
| LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable
(_) => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index e143c22..8d944ee 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -29,9 +29,9 @@ use crate::logical_plan::window_frames::{WindowFrame,
WindowFrameUnits};
use crate::logical_plan::Expr::Alias;
use crate::logical_plan::{
and, builder::expand_wildcard, col, lit, normalize_col, union_with_alias,
Column,
- CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable,
DFSchema,
- DFSchemaRef, DropTable, Expr, LogicalPlan, LogicalPlanBuilder, Operator,
PlanType,
- ToDFSchema, ToStringifiedPlan,
+ CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
+ CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, LogicalPlan,
+ LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan,
};
use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
@@ -172,7 +172,14 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
"Only `CREATE TABLE table_name AS SELECT ...` statement is
supported"
.to_string(),
)),
-
+ Statement::CreateSchema {
+ schema_name,
+ if_not_exists,
+ } => Ok(LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
+ schema_name: schema_name.to_string(),
+ if_not_exists,
+ schema: Arc::new(DFSchema::empty()),
+ })),
Statement::Drop {
object_type: ObjectType::Table,
if_exists,
diff --git a/datafusion/tests/sql/information_schema.rs
b/datafusion/tests/sql/information_schema.rs
index 260a1a6..00c691a 100644
--- a/datafusion/tests/sql/information_schema.rs
+++ b/datafusion/tests/sql/information_schema.rs
@@ -18,7 +18,7 @@
use async_trait::async_trait;
use datafusion::{
catalog::{
- catalog::MemoryCatalogProvider,
+ catalog::{CatalogProvider, MemoryCatalogProvider},
schema::{MemorySchemaProvider, SchemaProvider},
},
datasource::{TableProvider, TableType},