This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 6dfbe48328 Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842) 6dfbe48328 is described below commit 6dfbe483289ef0b10c82ffa98e3be8b5201acb78 Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Tue Apr 4 16:15:11 2023 +0200 Move `TransactionStart`/`TransactionEnd`/`SetVariable` into `LogicalPlan::Statement` (#5842) --- datafusion/core/src/execution/context.rs | 12 +- datafusion/core/src/physical_plan/planner.rs | 16 +- datafusion/expr/src/lib.rs | 12 +- datafusion/expr/src/logical_plan/mod.rs | 16 +- datafusion/expr/src/logical_plan/plan.rs | 141 ++-------------- datafusion/expr/src/logical_plan/statement.rs | 188 +++++++++++++++++++++ datafusion/expr/src/utils.rs | 4 +- .../optimizer/src/common_subexpr_eliminate.rs | 4 +- datafusion/proto/src/logical_plan/mod.rs | 10 +- datafusion/sql/src/statement.rs | 33 ++-- 10 files changed, 251 insertions(+), 185 deletions(-) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 9ff7c66004..eb9f6ba3c4 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -31,7 +31,9 @@ use crate::{ optimizer::PhysicalOptimizerRule, }, }; -use datafusion_expr::{DescribeTable, DmlStatement, StringifiedPlan, WriteOp}; +use datafusion_expr::{ + logical_plan::Statement, DescribeTable, DmlStatement, StringifiedPlan, WriteOp, +}; pub use datafusion_physical_expr::execution_props::ExecutionProps; use datafusion_physical_expr::var_provider::is_system_variables; use parking_lot::RwLock; @@ -444,9 +446,11 @@ impl SessionContext { } } - LogicalPlan::SetVariable(SetVariable { - variable, value, .. - }) => { + LogicalPlan::Statement(Statement::SetVariable(SetVariable { + variable, + value, + .. + })) => { let mut state = self.state.write(); state.config.options_mut().set(&variable, &value)?; drop(state); diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index f9a9209454..25afdb4d1c 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -1156,21 +1156,11 @@ impl DefaultPhysicalPlanner { "Unsupported logical plan: Dml".to_string(), )) } - LogicalPlan::TransactionStart(_) => { + LogicalPlan::Statement(statement) => { // DataFusion is a read-only query engine, but also a library, so consumers may implement this + let name = statement.name(); Err(DataFusionError::NotImplemented( - "Unsupported logical plan: TransactionStart".to_string(), - )) - } - LogicalPlan::TransactionEnd(_) => { - // DataFusion is a read-only query engine, but also a library, so consumers may implement this - Err(DataFusionError::NotImplemented( - "Unsupported logical plan: TransactionEnd".to_string(), - )) - } - LogicalPlan::SetVariable(_) => { - Err(DataFusionError::Internal( - "Unsupported logical plan: SetVariable must be root of the plan".to_string(), + format!("Unsupported logical plan: Statement({name})") )) } LogicalPlan::DescribeTable(_) => { diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index 8291f6f34b..0d5963021e 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -70,13 +70,15 @@ pub use logical_plan::{ builder::{ build_join_schema, union, wrap_projection_for_join_if_necessary, UNNAMED_TABLE, }, - Aggregate, CreateCatalog, CreateCatalogSchema, CreateExternalTable, + Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement, DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, - JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, Projection, - Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, - ToStringifiedPlan, Union, Unnest, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, - Values, Window, WriteOp, + JoinType, Limit, LogicalPlan, LogicalPlanBuilder, Partitioning, PlanType, Prepare, + Projection, Repartition, SetVariable, Sort, Statement, StringifiedPlan, Subquery, + SubqueryAlias, TableScan, ToStringifiedPlan, TransactionAccessMode, + TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, + Union, Unnest, UserDefinedLogicalNode, UserDefinedLogicalNodeCore, Values, Window, + WriteOp, }; pub use nullif::SUPPORTED_NULLIF_TYPES; pub use operator::Operator; diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 764d02547b..d2b0410097 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -19,16 +19,20 @@ pub mod builder; pub mod display; mod extension; mod plan; +mod statement; pub use builder::{table_scan, LogicalPlanBuilder}; pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, - CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DmlStatement, - DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, - JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, - Repartition, SetVariable, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, - ToStringifiedPlan, TransactionAccessMode, TransactionConclusion, TransactionEnd, - TransactionIsolationLevel, TransactionStart, Union, Unnest, Values, Window, WriteOp, + CreateMemoryTable, CreateView, CrossJoin, DescribeTable, Distinct, DropTable, + DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, + Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection, Repartition, Sort, + StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union, + Unnest, Values, Window, +}; +pub use statement::{ + DmlStatement, SetVariable, Statement, TransactionAccessMode, TransactionConclusion, + TransactionEnd, TransactionIsolationLevel, TransactionStart, WriteOp, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index b4fc9edd6b..e527a0a70c 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -19,6 +19,8 @@ use crate::logical_plan::builder::validate_unique_names; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; +use crate::logical_plan::statement::{DmlStatement, Statement}; + use crate::logical_plan::plan; use crate::utils::{ enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, from_plan, @@ -88,6 +90,8 @@ pub enum LogicalPlan { SubqueryAlias(SubqueryAlias), /// Skip some number of rows, and then fetch some number of rows. Limit(Limit), + /// [`Statement`] + Statement(Statement), /// Creates an external table. CreateExternalTable(CreateExternalTable), /// Creates an in memory table. @@ -116,8 +120,6 @@ pub enum LogicalPlan { Extension(Extension), /// Remove duplicate rows from the input Distinct(Distinct), - /// Set a Variable - SetVariable(SetVariable), /// Prepare a statement Prepare(Prepare), /// Insert / Update / Delete @@ -126,10 +128,6 @@ pub enum LogicalPlan { DescribeTable(DescribeTable), /// Unnest a column that contains a nested list type. Unnest(Unnest), - // Begin a transaction - TransactionStart(TransactionStart), - // Commit or rollback a transaction - TransactionEnd(TransactionEnd), } impl LogicalPlan { @@ -151,6 +149,7 @@ impl LogicalPlan { LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema, LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(), LogicalPlan::Limit(Limit { input, .. }) => input.schema(), + LogicalPlan::Statement(statement) => statement.schema(), LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(), LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema, LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => { @@ -169,14 +168,11 @@ impl LogicalPlan { LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema, LogicalPlan::DropTable(DropTable { schema, .. }) => schema, LogicalPlan::DropView(DropView { schema, .. }) => schema, - LogicalPlan::SetVariable(SetVariable { schema, .. }) => schema, LogicalPlan::DescribeTable(DescribeTable { dummy_schema, .. }) => { dummy_schema } LogicalPlan::Dml(DmlStatement { table_schema, .. }) => table_schema, LogicalPlan::Unnest(Unnest { schema, .. }) => schema, - LogicalPlan::TransactionStart(TransactionStart { schema, .. }) => schema, - LogicalPlan::TransactionEnd(TransactionEnd { schema, .. }) => schema, } } @@ -243,12 +239,10 @@ impl LogicalPlan { self.inputs().iter().map(|p| p.schema()).collect() } // return empty - LogicalPlan::DropTable(_) + LogicalPlan::Statement(_) + | LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) - | LogicalPlan::DescribeTable(_) - | LogicalPlan::TransactionStart(_) - | LogicalPlan::TransactionEnd(_) - | LogicalPlan::SetVariable(_) => vec![], + | LogicalPlan::DescribeTable(_) => vec![], } } @@ -362,15 +356,13 @@ impl LogicalPlan { | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) + | LogicalPlan::Statement(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateMemoryTable(_) | LogicalPlan::CreateView(_) | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) - | LogicalPlan::TransactionStart(_) - | LogicalPlan::TransactionEnd(_) - | LogicalPlan::SetVariable(_) | LogicalPlan::DropView(_) | LogicalPlan::CrossJoin(_) | LogicalPlan::Analyze(_) @@ -414,15 +406,13 @@ impl LogicalPlan { LogicalPlan::Unnest(Unnest { input, .. }) => vec![input], // plans without inputs LogicalPlan::TableScan { .. } + | LogicalPlan::Statement { .. } | LogicalPlan::EmptyRelation { .. } | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) - | LogicalPlan::TransactionStart(_) - | LogicalPlan::TransactionEnd(_) - | LogicalPlan::SetVariable(_) | LogicalPlan::DropView(_) | LogicalPlan::DescribeTable(_) => vec![], } @@ -1039,6 +1029,9 @@ impl LogicalPlan { LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => { write!(f, "SubqueryAlias: {alias}") } + LogicalPlan::Statement(statement) => { + write!(f, "{}", statement.display()) + } LogicalPlan::CreateExternalTable(CreateExternalTable { ref name, .. @@ -1077,30 +1070,11 @@ impl LogicalPlan { }) => { write!(f, "DropTable: {name:?} if not exist:={if_exists}") } - LogicalPlan::TransactionStart(TransactionStart { - access_mode, - isolation_level, - .. - }) => { - write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}") - } - LogicalPlan::TransactionEnd(TransactionEnd { - conclusion, - chain, - .. - }) => { - write!(f, "TransactionEnd: {conclusion:?} chain:={chain}") - } LogicalPlan::DropView(DropView { name, if_exists, .. }) => { write!(f, "DropView: {name:?} if not exist:={if_exists}") } - LogicalPlan::SetVariable(SetVariable { - variable, value, .. - }) => { - write!(f, "SetVariable: set {variable:?} to {value:?}") - } LogicalPlan::Distinct(Distinct { .. }) => { write!(f, "Distinct:") } @@ -1234,18 +1208,6 @@ pub struct DropView { pub schema: DFSchemaRef, } -/// Set a Variable's value -- value in -/// [`ConfigOptions`](datafusion_common::config::ConfigOptions) -#[derive(Clone, PartialEq, Eq, Hash)] -pub struct SetVariable { - /// The variable name - pub variable: String, - /// The value to set - pub value: String, - /// Dummy schema - pub schema: DFSchemaRef, -} - /// Produces no rows: An empty relation with an empty schema #[derive(Clone, PartialEq, Eq, Hash)] pub struct EmptyRelation { @@ -1569,83 +1531,6 @@ impl Hash for CreateExternalTable { } } -#[derive(Clone, PartialEq, Eq, Hash)] -pub enum WriteOp { - Insert, - Delete, - Update, - Ctas, -} - -impl Display for WriteOp { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - WriteOp::Insert => write!(f, "Insert"), - WriteOp::Delete => write!(f, "Delete"), - WriteOp::Update => write!(f, "Update"), - WriteOp::Ctas => write!(f, "Ctas"), - } - } -} - -/// The operator that modifies the content of a database (adapted from substrait WriteRel) -#[derive(Clone, PartialEq, Eq, Hash)] -pub struct DmlStatement { - /// The table name - pub table_name: OwnedTableReference, - /// The schema of the table (must align with Rel input) - pub table_schema: DFSchemaRef, - /// The type of operation to perform - pub op: WriteOp, - /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema - pub input: Arc<LogicalPlan>, -} - -/// Indicates if a transaction was committed or aborted -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub enum TransactionConclusion { - Commit, - Rollback, -} - -/// Indicates if this transaction is allowed to write -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub enum TransactionAccessMode { - ReadOnly, - ReadWrite, -} - -/// Indicates ANSI transaction isolation level -#[derive(Clone, PartialEq, Eq, Hash, Debug)] -pub enum TransactionIsolationLevel { - ReadUncommitted, - ReadCommitted, - RepeatableRead, - Serializable, -} - -/// Indicator that the following statements should be committed or rolled back atomically -#[derive(Clone, PartialEq, Eq, Hash)] -pub struct TransactionStart { - /// indicates if transaction is allowed to write - pub access_mode: TransactionAccessMode, - // indicates ANSI isolation level - pub isolation_level: TransactionIsolationLevel, - /// Empty schema - pub schema: DFSchemaRef, -} - -/// Indicator that any current transaction should be terminated -#[derive(Clone, PartialEq, Eq, Hash)] -pub struct TransactionEnd { - /// whether the transaction committed or aborted - pub conclusion: TransactionConclusion, - /// if specified a new transaction is immediately started with same characteristics - pub chain: bool, - /// Empty schema - pub schema: DFSchemaRef, -} - /// Prepare a statement but do not execute it. Prepare statements can have 0 or more /// `Expr::Placeholder` expressions that are filled in during execution #[derive(Clone, PartialEq, Eq, Hash)] diff --git a/datafusion/expr/src/logical_plan/statement.rs b/datafusion/expr/src/logical_plan/statement.rs new file mode 100644 index 0000000000..dad09d996b --- /dev/null +++ b/datafusion/expr/src/logical_plan/statement.rs @@ -0,0 +1,188 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::{self, Display}, + sync::Arc, +}; + +use datafusion_common::{DFSchemaRef, OwnedTableReference}; + +use crate::LogicalPlan; + +/// Various types of Statements. +/// +/// # Transactions: +/// +/// While DataFusion does not offer support transactions, it provides +/// [`LogicalPlan`](crate::LogicalPlan) support to assist building +/// database systems using DataFusion +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum Statement { + // Begin a transaction + TransactionStart(TransactionStart), + // Commit or rollback a transaction + TransactionEnd(TransactionEnd), + /// Set a Variable + SetVariable(SetVariable), +} + +impl Statement { + /// Get a reference to the logical plan's schema + pub fn schema(&self) -> &DFSchemaRef { + match self { + Statement::TransactionStart(TransactionStart { schema, .. }) => schema, + Statement::TransactionEnd(TransactionEnd { schema, .. }) => schema, + Statement::SetVariable(SetVariable { schema, .. }) => schema, + } + } + + /// Return a descriptive string describing the type of this + /// [`Statement`] + pub fn name(&self) -> &str { + match self { + Statement::TransactionStart(_) => "TransactionStart", + Statement::TransactionEnd(_) => "TransactionEnd", + Statement::SetVariable(_) => "SetVariable", + } + } + + /// Return a `format`able structure with the a human readable + /// description of this LogicalPlan node per node, not including + /// children. + /// + /// See [LogicalPlan::display] for an example + pub fn display(&self) -> impl fmt::Display + '_ { + struct Wrapper<'a>(&'a Statement); + impl<'a> Display for Wrapper<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self.0 { + Statement::TransactionStart(TransactionStart { + access_mode, + isolation_level, + .. + }) => { + write!(f, "TransactionStart: {access_mode:?} {isolation_level:?}") + } + Statement::TransactionEnd(TransactionEnd { + conclusion, + chain, + .. + }) => { + write!(f, "TransactionEnd: {conclusion:?} chain:={chain}") + } + Statement::SetVariable(SetVariable { + variable, value, .. + }) => { + write!(f, "SetVariable: set {variable:?} to {value:?}") + } + } + } + } + Wrapper(self) + } +} + +/// The operator that modifies the content of a database (adapted from +/// substrait WriteRel) +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct DmlStatement { + /// The table name + pub table_name: OwnedTableReference, + /// The schema of the table (must align with Rel input) + pub table_schema: DFSchemaRef, + /// The type of operation to perform + pub op: WriteOp, + /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema + pub input: Arc<LogicalPlan>, +} + +#[derive(Clone, PartialEq, Eq, Hash)] +pub enum WriteOp { + Insert, + Delete, + Update, + Ctas, +} + +impl Display for WriteOp { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + WriteOp::Insert => write!(f, "Insert"), + WriteOp::Delete => write!(f, "Delete"), + WriteOp::Update => write!(f, "Update"), + WriteOp::Ctas => write!(f, "Ctas"), + } + } +} + +/// Indicates if a transaction was committed or aborted +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub enum TransactionConclusion { + Commit, + Rollback, +} + +/// Indicates if this transaction is allowed to write +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub enum TransactionAccessMode { + ReadOnly, + ReadWrite, +} + +/// Indicates ANSI transaction isolation level +#[derive(Clone, PartialEq, Eq, Hash, Debug)] +pub enum TransactionIsolationLevel { + ReadUncommitted, + ReadCommitted, + RepeatableRead, + Serializable, +} + +/// Indicator that the following statements should be committed or rolled back atomically +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct TransactionStart { + /// indicates if transaction is allowed to write + pub access_mode: TransactionAccessMode, + // indicates ANSI isolation level + pub isolation_level: TransactionIsolationLevel, + /// Empty schema + pub schema: DFSchemaRef, +} + +/// Indicator that any current transaction should be terminated +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct TransactionEnd { + /// whether the transaction committed or aborted + pub conclusion: TransactionConclusion, + /// if specified a new transaction is immediately started with same characteristics + pub chain: bool, + /// Empty schema + pub schema: DFSchemaRef, +} + +/// Set a Variable's value -- value in +/// [`ConfigOptions`](datafusion_common::config::ConfigOptions) +#[derive(Clone, PartialEq, Eq, Hash)] +pub struct SetVariable { + /// The variable name + pub variable: String, + /// The value to set + pub value: String, + /// Dummy schema + pub schema: DFSchemaRef, +} diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 726365f574..f3975e8346 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -914,9 +914,7 @@ pub fn from_plan( | LogicalPlan::CreateExternalTable(_) | LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) - | LogicalPlan::TransactionStart(_) - | LogicalPlan::TransactionEnd(_) - | LogicalPlan::SetVariable(_) + | LogicalPlan::Statement(_) | LogicalPlan::CreateCatalogSchema(_) | LogicalPlan::CreateCatalog(_) => { // All of these plan types have no inputs / exprs so should not be called diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index d3fba585fe..719b17110b 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -237,9 +237,7 @@ impl OptimizerRule for CommonSubexprEliminate { | LogicalPlan::CreateCatalog(_) | LogicalPlan::DropTable(_) | LogicalPlan::DropView(_) - | LogicalPlan::TransactionStart(_) - | LogicalPlan::TransactionEnd(_) - | LogicalPlan::SetVariable(_) + | LogicalPlan::Statement(_) | LogicalPlan::DescribeTable(_) | LogicalPlan::Distinct(_) | LogicalPlan::Extension(_) diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index 936ce4984c..ff86fc8634 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -1360,18 +1360,12 @@ impl AsLogicalPlan for LogicalPlanNode { LogicalPlan::DropView(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for DropView", )), - LogicalPlan::SetVariable(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for SetVariable", + LogicalPlan::Statement(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for Statement", )), LogicalPlan::Dml(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for Dml", )), - LogicalPlan::TransactionStart(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for Transactions", - )), - LogicalPlan::TransactionEnd(_) => Err(proto_error( - "LogicalPlan serde is not yet implemented for Transactions", - )), LogicalPlan::DescribeTable(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for DescribeTable", )), diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index b396976186..ef1ecfef58 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -30,17 +30,15 @@ use datafusion_common::{ }; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::builder::project; -use datafusion_expr::logical_plan::{ - Analyze, Prepare, TransactionAccessMode, TransactionConclusion, TransactionEnd, - TransactionIsolationLevel, TransactionStart, -}; use datafusion_expr::utils::expr_to_columns; use datafusion_expr::{ - cast, col, CreateCatalog, CreateCatalogSchema, + cast, col, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable as PlanCreateExternalTable, CreateMemoryTable, CreateView, DescribeTable, DmlStatement, DropTable, DropView, EmptyRelation, Explain, - ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable, - ToStringifiedPlan, WriteOp, + ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder, PlanType, Prepare, + SetVariable, Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode, + TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart, + WriteOp, }; use sqlparser::ast; use sqlparser::ast::{ @@ -435,25 +433,28 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { TransactionAccessMode::ReadWrite } }; - Ok(LogicalPlan::TransactionStart(TransactionStart { + let statement = PlanStatement::TransactionStart(TransactionStart { access_mode, isolation_level, schema: DFSchemaRef::new(DFSchema::empty()), - })) + }); + Ok(LogicalPlan::Statement(statement)) } Statement::Commit { chain } => { - Ok(LogicalPlan::TransactionEnd(TransactionEnd { + let statement = PlanStatement::TransactionEnd(TransactionEnd { conclusion: TransactionConclusion::Commit, chain, schema: DFSchemaRef::new(DFSchema::empty()), - })) + }); + Ok(LogicalPlan::Statement(statement)) } Statement::Rollback { chain } => { - Ok(LogicalPlan::TransactionEnd(TransactionEnd { + let statement = PlanStatement::TransactionEnd(TransactionEnd { conclusion: TransactionConclusion::Rollback, chain, schema: DFSchemaRef::new(DFSchema::empty()), - })) + }); + Ok(LogicalPlan::Statement(statement)) } _ => Err(DataFusionError::NotImplemented(format!( @@ -736,11 +737,13 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { } }; - Ok(LogicalPlan::SetVariable(SetVariable { + let statement = PlanStatement::SetVariable(SetVariable { variable: variable_lower, value: value_string, schema: DFSchemaRef::new(DFSchema::empty()), - })) + }); + + Ok(LogicalPlan::Statement(statement)) } fn delete_to_plan(