This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 19d937a0f Add `CREATE VIEW` (#2279)
19d937a0f is described below
commit 19d937a0f3c99be361df4bc912cc340a6196634a
Author: Matthew Turner <[email protected]>
AuthorDate: Wed May 11 12:03:27 2022 -0400
Add `CREATE VIEW` (#2279)
* Initial commit
* First passing test
* Add OR REPLACE and more tests
* Update doc comment
* More tests
* Add CreateView to Ballista
* Include Q15 for TPCH
* Ignore q15
* Delete view physical plan
---
ballista/rust/core/proto/ballista.proto | 7 +
ballista/rust/core/src/serde/logical_plan/mod.rs | 33 +-
datafusion-examples/examples/custom_datasource.rs | 6 +-
datafusion/core/src/datasource/datasource.rs | 6 +-
datafusion/core/src/datasource/empty.rs | 6 +-
datafusion/core/src/datasource/listing/table.rs | 6 +-
datafusion/core/src/datasource/memory.rs | 6 +-
datafusion/core/src/datasource/mod.rs | 2 +
datafusion/core/src/datasource/view.rs | 364 +++++++++++++++++++++
datafusion/core/src/execution/context.rs | 36 +-
datafusion/core/src/logical_plan/mod.rs | 7 +-
datafusion/core/src/logical_plan/plan.rs | 4 +-
.../core/src/optimizer/common_subexpr_eliminate.rs | 1 +
datafusion/core/src/optimizer/filter_push_down.rs | 6 +-
.../core/src/optimizer/projection_push_down.rs | 1 +
datafusion/core/src/optimizer/utils.rs | 13 +-
datafusion/core/src/physical_plan/planner.rs | 2 +-
datafusion/core/src/sql/planner.rs | 19 +-
datafusion/core/tests/custom_sources.rs | 9 +-
datafusion/core/tests/provider_filter_pushdown.rs | 6 +-
datafusion/core/tests/statistics.rs | 6 +-
datafusion/expr/src/logical_plan/mod.rs | 8 +-
datafusion/expr/src/logical_plan/plan.rs | 29 +-
23 files changed, 547 insertions(+), 36 deletions(-)
diff --git a/ballista/rust/core/proto/ballista.proto
b/ballista/rust/core/proto/ballista.proto
index 1e7901403..e7821dc10 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -53,6 +53,7 @@ message LogicalPlanNode {
UnionNode union = 19;
CreateCatalogNode create_catalog = 20;
SubqueryAliasNode subquery_alias = 21;
+ CreateViewNode create_view = 22;
}
}
@@ -171,6 +172,12 @@ message CreateCatalogNode {
datafusion.DfSchema schema = 3;
}
+message CreateViewNode {
+ string name = 1;
+ LogicalPlanNode input = 2;
+ bool or_replace = 3;
+}
+
// a node containing data for defining values list. unlike in SQL where it's
two dimensional, here
// the list is flattened, and with the field n_cols it can be parsed and
partitioned into rows
message ValuesNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index 5307aff65..e6d9e6289 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -33,8 +33,8 @@ use datafusion::logical_plan::plan::{
};
use datafusion::logical_plan::{
source_as_provider, Column, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
- CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder,
Repartition,
- TableScan, Values,
+ CreateView, CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan,
LogicalPlanBuilder,
+ Repartition, TableScan, Values,
};
use datafusion::prelude::SessionContext;
@@ -334,6 +334,19 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: create_extern_table.if_not_exists,
}))
}
+ LogicalPlanType::CreateView(create_view) => {
+ let plan = create_view
+ .input.clone().ok_or_else(||
BallistaError::General(String::from(
+ "Protobuf deserialization error, CreateViewNode has
invalid LogicalPlan input.",
+ )))?
+ .try_into_logical_plan(ctx, extension_codec)?;
+
+ Ok(LogicalPlan::CreateView(CreateView {
+ name: create_view.name.clone(),
+ input: Arc::new(plan),
+ or_replace: create_view.or_replace,
+ }))
+ }
LogicalPlanType::CreateCatalogSchema(create_catalog_schema) => {
let pb_schema =
(create_catalog_schema.schema.clone()).ok_or_else(|| {
BallistaError::General(String::from(
@@ -851,6 +864,22 @@ impl AsLogicalPlan for LogicalPlanNode {
)),
})
}
+ LogicalPlan::CreateView(CreateView {
+ name,
+ input,
+ or_replace,
+ }) => Ok(protobuf::LogicalPlanNode {
+ logical_plan_type: Some(LogicalPlanType::CreateView(Box::new(
+ protobuf::CreateViewNode {
+ name: name.clone(),
+ input:
Some(Box::new(LogicalPlanNode::try_from_logical_plan(
+ input,
+ extension_codec,
+ )?)),
+ or_replace: *or_replace,
+ },
+ ))),
+ }),
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
if_not_exists,
diff --git a/datafusion-examples/examples/custom_datasource.rs
b/datafusion-examples/examples/custom_datasource.rs
index d8a908986..a9a8ef7aa 100644
--- a/datafusion-examples/examples/custom_datasource.rs
+++ b/datafusion-examples/examples/custom_datasource.rs
@@ -20,7 +20,7 @@ use datafusion::arrow::array::{UInt64Builder, UInt8Builder};
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::dataframe::DataFrame;
-use datafusion::datasource::TableProvider;
+use datafusion::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::TaskContext;
use datafusion::logical_plan::{Expr, LogicalPlanBuilder};
@@ -165,6 +165,10 @@ impl TableProvider for CustomDataSource {
]))
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/datasource.rs
b/datafusion/core/src/datasource/datasource.rs
index f4fdc975d..8ab254525 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -21,7 +21,7 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
-use datafusion_expr::{TableProviderFilterPushDown, TableType};
+pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
@@ -39,9 +39,7 @@ pub trait TableProvider: Sync + Send {
fn schema(&self) -> SchemaRef;
/// Get the type of this table for metadata/catalog purposes.
- fn table_type(&self) -> TableType {
- TableType::Base
- }
+ fn table_type(&self) -> TableType;
/// Create an ExecutionPlan that will scan the table.
/// The table provider will be usually responsible of grouping
diff --git a/datafusion/core/src/datasource/empty.rs
b/datafusion/core/src/datasource/empty.rs
index 5622d15a0..837cd7704 100644
--- a/datafusion/core/src/datasource/empty.rs
+++ b/datafusion/core/src/datasource/empty.rs
@@ -23,7 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::*;
use async_trait::async_trait;
-use crate::datasource::TableProvider;
+use crate::datasource::{TableProvider, TableType};
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::project_schema;
@@ -51,6 +51,10 @@ impl TableProvider for EmptyTable {
self.schema.clone()
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/listing/table.rs
b/datafusion/core/src/datasource/listing/table.rs
index 6881f674b..bde88b659 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -28,7 +28,7 @@ use crate::datasource::{
avro::AvroFormat, csv::CsvFormat, json::JsonFormat,
parquet::ParquetFormat,
FileFormat,
},
- get_statistics_with_limit, TableProvider,
+ get_statistics_with_limit, TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
@@ -298,6 +298,10 @@ impl TableProvider for ListingTable {
Arc::clone(&self.table_schema)
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/memory.rs
b/datafusion/core/src/datasource/memory.rs
index 72630b39c..adc26d2f4 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -27,7 +27,7 @@ use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use crate::datasource::TableProvider;
+use crate::datasource::{TableProvider, TableType};
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::logical_plan::Expr;
@@ -127,6 +127,10 @@ impl TableProvider for MemTable {
self.schema.clone()
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/datasource/mod.rs
b/datafusion/core/src/datasource/mod.rs
index f4d059e3d..f3cc0a04e 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -24,12 +24,14 @@ pub mod file_format;
pub mod listing;
pub mod memory;
pub mod object_store_registry;
+pub mod view;
use futures::Stream;
pub use self::datasource::TableProvider;
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
+pub use self::view::ViewTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
pub use crate::logical_expr::TableType;
diff --git a/datafusion/core/src/datasource/view.rs
b/datafusion/core/src/datasource/view.rs
new file mode 100644
index 000000000..2bb3b687b
--- /dev/null
+++ b/datafusion/core/src/datasource/view.rs
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! View data source which uses a LogicalPlan as it's input.
+
+use std::{any::Any, sync::Arc};
+
+use arrow::datatypes::SchemaRef;
+use async_trait::async_trait;
+
+use crate::{
+ error::Result,
+ execution::context::SessionContext,
+ logical_plan::{Expr, LogicalPlan},
+ physical_plan::ExecutionPlan,
+};
+
+use crate::datasource::{TableProvider, TableType};
+
+/// An implementation of `TableProvider` that uses another logical plan.
+pub struct ViewTable {
+ /// To create ExecutionPlan
+ context: SessionContext,
+ /// LogicalPlan of the view
+ logical_plan: LogicalPlan,
+ /// File fields + partition columns
+ table_schema: SchemaRef,
+}
+
+impl ViewTable {
+ /// Create new view that is executed at query runtime.
+ /// Takes a `LogicalPlan` as input.
+ pub fn try_new(context: SessionContext, logical_plan: LogicalPlan) ->
Result<Self> {
+ let table_schema = logical_plan.schema().as_ref().to_owned().into();
+
+ let view = Self {
+ context,
+ logical_plan,
+ table_schema,
+ };
+
+ Ok(view)
+ }
+}
+
+#[async_trait]
+impl TableProvider for ViewTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ Arc::clone(&self.table_schema)
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::View
+ }
+
+ async fn scan(
+ &self,
+ _projection: &Option<Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ self.context.create_physical_plan(&self.logical_plan).await
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use crate::{assert_batches_eq, execution::context::SessionConfig};
+
+ use super::*;
+
+ #[tokio::test]
+ async fn query_view() -> Result<()> {
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+ .await?
+ .collect()
+ .await?;
+
+ let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx.sql("SELECT * FROM information_schema.tables
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+ assert_eq!(results[0].num_rows(), 1);
+
+ let results = session_ctx
+ .sql("SELECT * FROM xyz")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+---------+---------+---------+",
+ "| column1 | column2 | column3 |",
+ "+---------+---------+---------+",
+ "| 1 | 2 | 3 |",
+ "| 4 | 5 | 6 |",
+ "+---------+---------+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn query_view_with_projection() -> Result<()> {
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+ .await?
+ .collect()
+ .await?;
+
+ let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx.sql("SELECT * FROM information_schema.tables
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+ assert_eq!(results[0].num_rows(), 1);
+
+ let results = session_ctx
+ .sql("SELECT column1 FROM xyz")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "| 4 |",
+ "+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn query_view_with_filter() -> Result<()> {
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+ .await?
+ .collect()
+ .await?;
+
+ let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx.sql("SELECT * FROM information_schema.tables
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+ assert_eq!(results[0].num_rows(), 1);
+
+ let results = session_ctx
+ .sql("SELECT column1 FROM xyz WHERE column2 = 5")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 4 |",
+ "+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn query_join_views() -> Result<()> {
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+ .await?
+ .collect()
+ .await?;
+
+ let view_sql = "CREATE VIEW xyz AS SELECT column1, column2 FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let view_sql = "CREATE VIEW lmn AS SELECT column1, column3 FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx.sql("SELECT * FROM information_schema.tables
WHERE table_type='VIEW' AND (table_name = 'xyz' OR table_name =
'lmn')").await?.collect().await?;
+ assert_eq!(results[0].num_rows(), 2);
+
+ let results = session_ctx
+ .sql("SELECT * FROM xyz JOIN lmn USING (column1)")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+---------+---------+---------+",
+ "| column2 | column1 | column3 |",
+ "+---------+---------+---------+",
+ "| 2 | 1 | 3 |",
+ "| 5 | 4 | 6 |",
+ "+---------+---------+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn create_view_plan() -> Result<()> {
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+ .await?
+ .collect()
+ .await?;
+
+ let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx
+ .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+
"+---------------+--------------------------------------------------------+",
+ "| plan_type | plan
|",
+
"+---------------+--------------------------------------------------------+",
+ "| logical_plan | CreateView: \"xyz\"
|",
+ "| | Projection: #abc.column1, #abc.column2,
#abc.column3 |",
+ "| | TableScan: abc projection=Some([0, 1, 2])
|",
+ "| physical_plan | EmptyExec: produce_one_row=false
|",
+ "| |
|",
+
"+---------------+--------------------------------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ let results = session_ctx
+ .sql("EXPLAIN CREATE VIEW xyz AS SELECT * FROM abc WHERE column2 =
5")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+
"+---------------+--------------------------------------------------------+",
+ "| plan_type | plan
|",
+
"+---------------+--------------------------------------------------------+",
+ "| logical_plan | CreateView: \"xyz\"
|",
+ "| | Projection: #abc.column1, #abc.column2,
#abc.column3 |",
+ "| | Filter: #abc.column2 = Int64(5)
|",
+ "| | TableScan: abc projection=Some([0, 1, 2])
|",
+ "| physical_plan | EmptyExec: produce_one_row=false
|",
+ "| |
|",
+
"+---------------+--------------------------------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ let results = session_ctx
+ .sql("EXPLAIN CREATE VIEW xyz AS SELECT column1, column2 FROM abc
WHERE column2 = 5")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+---------------+----------------------------------------------+",
+ "| plan_type | plan |",
+ "+---------------+----------------------------------------------+",
+ "| logical_plan | CreateView: \"xyz\"
|",
+ "| | Projection: #abc.column1, #abc.column2 |",
+ "| | Filter: #abc.column2 = Int64(5) |",
+ "| | TableScan: abc projection=Some([0, 1]) |",
+ "| physical_plan | EmptyExec: produce_one_row=false |",
+ "| | |",
+ "+---------------+----------------------------------------------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn create_or_replace_view() -> Result<()> {
+ let session_ctx = SessionContext::with_config(
+ SessionConfig::new().with_information_schema(true),
+ );
+
+ session_ctx
+ .sql("CREATE TABLE abc AS VALUES (1,2,3), (4,5,6)")
+ .await?
+ .collect()
+ .await?;
+
+ let view_sql = "CREATE VIEW xyz AS SELECT * FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let view_sql = "CREATE OR REPLACE VIEW xyz AS SELECT column1 FROM abc";
+ session_ctx.sql(view_sql).await?.collect().await?;
+
+ let results = session_ctx.sql("SELECT * FROM information_schema.tables
WHERE table_type='VIEW' AND table_name = 'xyz'").await?.collect().await?;
+ assert_eq!(results[0].num_rows(), 1);
+
+ let results = session_ctx
+ .sql("SELECT * FROM xyz")
+ .await?
+ .collect()
+ .await?;
+
+ let expected = vec![
+ "+---------+",
+ "| column1 |",
+ "+---------+",
+ "| 1 |",
+ "| 4 |",
+ "+---------+",
+ ];
+
+ assert_batches_eq!(expected, &results);
+
+ Ok(())
+ }
+}
diff --git a/datafusion/core/src/execution/context.rs
b/datafusion/core/src/execution/context.rs
index 9f7dcd27e..7ca9a1e4d 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -30,7 +30,7 @@ use crate::{
parquet::{ParquetFormat, DEFAULT_PARQUET_EXTENSION},
FileFormat,
},
- MemTable,
+ MemTable, ViewTable,
},
logical_plan::{PlanType, ToStringifiedPlan},
optimizer::eliminate_filter::EliminateFilter,
@@ -62,7 +62,7 @@ use crate::datasource::TableProvider;
use crate::error::{DataFusionError, Result};
use crate::logical_plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
- DropTable, FileType, FunctionRegistry, LogicalPlan, LogicalPlanBuilder,
+ CreateView, DropTable, FileType, FunctionRegistry, LogicalPlan,
LogicalPlanBuilder,
UNNAMED_TABLE,
};
use crate::optimizer::common_subexpr_eliminate::CommonSubexprEliminate;
@@ -317,6 +317,38 @@ impl SessionContext {
}
}
+ LogicalPlan::CreateView(CreateView {
+ name,
+ input,
+ or_replace,
+ }) => {
+ let view = self.table(name.as_str());
+
+ match (or_replace, view) {
+ (true, Ok(_)) => {
+ self.deregister_table(name.as_str())?;
+ let plan = self.optimize(&input)?;
+ let table =
+ Arc::new(ViewTable::try_new(self.clone(),
plan.clone())?);
+
+ self.register_table(name.as_str(), table)?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (_, Err(_)) => {
+ let plan = self.optimize(&input)?;
+ let table =
+ Arc::new(ViewTable::try_new(self.clone(),
plan.clone())?);
+
+ self.register_table(name.as_str(), table)?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
+ }
+ (false, Ok(_)) => Err(DataFusionError::Execution(format!(
+ "Table '{:?}' already exists",
+ name
+ ))),
+ }
+ }
+
LogicalPlan::DropTable(DropTable {
name, if_exists, ..
}) => {
diff --git a/datafusion/core/src/logical_plan/mod.rs
b/datafusion/core/src/logical_plan/mod.rs
index 55295e22e..048b7b3ec 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -60,8 +60,9 @@ pub use expr_visitor::{ExprVisitable, ExpressionVisitor,
Recursion};
pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
- CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType,
Limit,
- LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
StringifiedPlan,
- Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode,
Values,
+ CreateView, CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint,
JoinType,
+ Limit, LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition,
+ StringifiedPlan, Subquery, TableScan, ToStringifiedPlan, Union,
+ UserDefinedLogicalNode, Values,
};
pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs
b/datafusion/core/src/logical_plan/plan.rs
index 08d1fa120..e1adb4939 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -25,8 +25,8 @@ pub use crate::logical_expr::{
logical_plan::{
display::{GraphvizVisitor, IndentVisitor},
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
- CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain,
Extension,
- FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
+ CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation,
Explain,
+ Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 967ef58b3..defb42289 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -224,6 +224,7 @@ fn optimize(plan: &LogicalPlan, execution_props:
&ExecutionProps) -> Result<Logi
| LogicalPlan::Explain { .. }
| LogicalPlan::Analyze { .. }
| LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs
b/datafusion/core/src/optimizer/filter_push_down.rs
index 0fd107b40..795f86392 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -560,7 +560,7 @@ mod tests {
use std::sync::Arc;
use super::*;
- use crate::datasource::TableProvider;
+ use crate::datasource::{TableProvider, TableType};
use crate::logical_plan::plan::provider_as_source;
use crate::logical_plan::{
and, col, lit, sum, union_with_alias, DFSchema, Expr,
LogicalPlanBuilder,
@@ -1354,6 +1354,10 @@ mod tests {
]))
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
_: &Option<Vec<usize>>,
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs
b/datafusion/core/src/optimizer/projection_push_down.rs
index 0979d8f5b..9cbec1e90 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -471,6 +471,7 @@ fn optimize_plan(
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
diff --git a/datafusion/core/src/optimizer/utils.rs
b/datafusion/core/src/optimizer/utils.rs
index 2c56b5f89..0e54fd93e 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -25,9 +25,9 @@ use datafusion_expr::logical_plan::{
};
use crate::logical_plan::{
- and, build_join_schema, Column, CreateMemoryTable, DFSchemaRef, Expr,
ExprVisitable,
- Limit, LogicalPlan, LogicalPlanBuilder, Operator, Partitioning, Recursion,
- Repartition, Union, Values,
+ and, build_join_schema, Column, CreateMemoryTable, CreateView,
DFSchemaRef, Expr,
+ ExprVisitable, Limit, LogicalPlan, LogicalPlanBuilder, Operator,
Partitioning,
+ Recursion, Repartition, Union, Values,
};
use crate::prelude::lit;
use crate::scalar::ScalarValue;
@@ -258,6 +258,13 @@ pub fn from_plan(
name: name.clone(),
if_not_exists: *if_not_exists,
})),
+ LogicalPlan::CreateView(CreateView {
+ name, or_replace, ..
+ }) => Ok(LogicalPlan::CreateView(CreateView {
+ input: Arc::new(inputs[0].clone()),
+ name: name.clone(),
+ or_replace: *or_replace,
+ })),
LogicalPlan::Extension(e) => Ok(LogicalPlan::Extension(Extension {
node: e.node.from_template(expr, inputs),
})),
diff --git a/datafusion/core/src/physical_plan/planner.rs
b/datafusion/core/src/physical_plan/planner.rs
index f6b3842f2..47829ad79 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -880,7 +880,7 @@ impl DefaultPhysicalPlanner {
"Unsupported logical plan: CreateCatalog".to_string(),
))
}
- | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable
(_) => {
+ | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::DropTable
(_) | LogicalPlan::CreateView(_) => {
// Create a dummy exec.
Ok(Arc::new(EmptyExec::new(
false,
diff --git a/datafusion/core/src/sql/planner.rs
b/datafusion/core/src/sql/planner.rs
index 143a0617a..4c6d26b66 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -31,8 +31,8 @@ use crate::logical_plan::{
and, builder::expand_qualified_wildcard, builder::expand_wildcard, col,
lit,
normalize_col, normalize_col_with_schemas, union_with_alias, Column,
CreateCatalog,
CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable,
- CreateMemoryTable, DFSchema, DFSchemaRef, DropTable, Expr, FileType,
LogicalPlan,
- LogicalPlanBuilder, Operator, PlanType, ToDFSchema, ToStringifiedPlan,
+ CreateMemoryTable, CreateView, DFSchema, DFSchemaRef, DropTable, Expr,
FileType,
+ LogicalPlan, LogicalPlanBuilder, Operator, PlanType, ToDFSchema,
ToStringifiedPlan,
};
use crate::optimizer::utils::exprlist_to_columns;
use crate::prelude::JoinType;
@@ -174,6 +174,21 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
}))
}
+ Statement::CreateView {
+ or_replace,
+ name,
+ columns,
+ query,
+ with_options,
+ ..
+ } if columns.is_empty() && with_options.is_empty() => {
+ let plan = self.query_to_plan(*query, &mut HashMap::new())?;
+ Ok(LogicalPlan::CreateView(CreateView {
+ name: name.to_string(),
+ input: Arc::new(plan),
+ or_replace,
+ }))
+ }
Statement::CreateTable { .. } =>
Err(DataFusionError::NotImplemented(
"Only `CREATE TABLE table_name AS SELECT ...` statement is
supported"
.to_string(),
diff --git a/datafusion/core/tests/custom_sources.rs
b/datafusion/core/tests/custom_sources.rs
index 81e4706de..f1356f7d4 100644
--- a/datafusion/core/tests/custom_sources.rs
+++ b/datafusion/core/tests/custom_sources.rs
@@ -24,7 +24,10 @@ use datafusion::from_slice::FromSlice;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::scalar::ScalarValue;
-use datafusion::{datasource::TableProvider, physical_plan::collect};
+use datafusion::{
+ datasource::{TableProvider, TableType},
+ physical_plan::collect,
+};
use datafusion::{error::Result, physical_plan::DisplayFormatType};
use datafusion::execution::context::{SessionContext, TaskContext};
@@ -192,6 +195,10 @@ impl TableProvider for CustomTableProvider {
TEST_CUSTOM_SCHEMA_REF!()
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs
b/datafusion/core/tests/provider_filter_pushdown.rs
index 49cd70143..c8fe483ea 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -19,7 +19,7 @@ use arrow::array::{as_primitive_array, Int32Builder,
UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use datafusion::datasource::datasource::TableProvider;
+use datafusion::datasource::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
@@ -132,6 +132,10 @@ impl TableProvider for CustomProvider {
self.zero_batch.schema()
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
_: &Option<Vec<usize>>,
diff --git a/datafusion/core/tests/statistics.rs
b/datafusion/core/tests/statistics.rs
index 031506704..99b53a62d 100644
--- a/datafusion/core/tests/statistics.rs
+++ b/datafusion/core/tests/statistics.rs
@@ -21,7 +21,7 @@ use std::{any::Any, sync::Arc};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion::{
- datasource::TableProvider,
+ datasource::{TableProvider, TableType},
error::Result,
logical_plan::Expr,
physical_plan::{
@@ -68,6 +68,10 @@ impl TableProvider for StatisticsValidation {
Arc::clone(&self.schema)
}
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
async fn scan(
&self,
projection: &Option<Vec<usize>>,
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index a37729f7d..3681ff14e 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -21,10 +21,10 @@ mod plan;
pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema,
CreateExternalTable,
- CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain,
Extension, FileType,
- Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning,
PlanType,
- PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery,
SubqueryAlias,
- TableScan, ToStringifiedPlan, Union, Values, Window,
+ CreateMemoryTable, CreateView, CrossJoin, DropTable, EmptyRelation,
Explain,
+ Extension, FileType, Filter, Join, JoinConstraint, JoinType, Limit,
LogicalPlan,
+ Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
StringifiedPlan,
+ Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, Values,
Window,
};
pub use display::display_schema;
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 579898dbe..ab96dbe73 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -77,6 +77,8 @@ pub enum LogicalPlan {
CreateExternalTable(CreateExternalTable),
/// Creates an in memory table.
CreateMemoryTable(CreateMemoryTable),
+ /// Creates a new view.
+ CreateView(CreateView),
/// Creates a new catalog schema.
CreateCatalogSchema(CreateCatalogSchema),
/// Creates a new catalog (aka "Database").
@@ -124,9 +126,8 @@ impl LogicalPlan {
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
LogicalPlan::Union(Union { schema, .. }) => schema,
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) =>
{
- input.schema()
- }
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+ | LogicalPlan::CreateView(CreateView { input, .. }) =>
input.schema(),
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, ..
}) => {
schema
}
@@ -185,6 +186,7 @@ impl LogicalPlan {
| LogicalPlan::Repartition(Repartition { input, .. })
| LogicalPlan::Sort(Sort { input, .. })
| LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+ | LogicalPlan::CreateView(CreateView { input, .. })
| LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
LogicalPlan::DropTable(_) => vec![],
}
@@ -235,6 +237,7 @@ impl LogicalPlan {
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
| LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateView(_)
| LogicalPlan::CreateCatalogSchema(_)
| LogicalPlan::CreateCatalog(_)
| LogicalPlan::DropTable(_)
@@ -266,7 +269,8 @@ impl LogicalPlan {
LogicalPlan::Union(Union { inputs, .. }) =>
inputs.iter().collect(),
LogicalPlan::Explain(explain) => vec![&explain.plan],
LogicalPlan::Analyze(analyze) => vec![&analyze.input],
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) =>
{
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+ | LogicalPlan::CreateView(CreateView { input, .. }) => {
vec![input]
}
// plans without inputs
@@ -405,7 +409,8 @@ impl LogicalPlan {
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
input.accept(visitor)?
}
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) =>
{
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+ | LogicalPlan::CreateView(CreateView { input, .. }) => {
input.accept(visitor)?
}
LogicalPlan::Extension(extension) => {
@@ -793,6 +798,9 @@ impl LogicalPlan {
}) => {
write!(f, "CreateMemoryTable: {:?}", name)
}
+ LogicalPlan::CreateView(CreateView { name, .. }) => {
+ write!(f, "CreateView: {:?}", name)
+ }
LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
schema_name,
..
@@ -1036,6 +1044,17 @@ pub struct CreateMemoryTable {
pub if_not_exists: bool,
}
+/// Creates a view.
+#[derive(Clone)]
+pub struct CreateView {
+ /// The table name
+ pub name: String,
+ /// The logical plan
+ pub input: Arc<LogicalPlan>,
+ /// Option to not error if table already exists
+ pub or_replace: bool,
+}
+
/// Types of files to parse as DataFrames
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FileType {