This is an automated email from the ASF dual-hosted git repository.
jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d2a15b3f8a feat: support logical plan for `EXECUTE` statement (#13194)
d2a15b3f8a is described below
commit d2a15b3f8a9394b4c2bf283de692f73aecc9e17c
Author: Jonah Gao <[email protected]>
AuthorDate: Fri Nov 1 10:51:31 2024 +0800
feat: support logical plan for `EXECUTE` statement (#13194)
* Add Execute LogicalPlan
* Fix compile
* Add tests
---
datafusion/core/src/physical_planner.rs | 3 ++
datafusion/expr/src/expr_rewriter/mod.rs | 1 +
datafusion/expr/src/logical_plan/display.rs | 17 +++++++---
datafusion/expr/src/logical_plan/mod.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 39 ++++++++++++++++++++++
datafusion/expr/src/logical_plan/tree_node.rs | 26 ++++++++++++---
.../optimizer/src/common_subexpr_eliminate.rs | 3 +-
.../optimizer/src/optimize_projections/mod.rs | 3 +-
datafusion/proto/src/logical_plan/mod.rs | 3 ++
datafusion/sql/src/statement.rs | 26 ++++++++++++++-
datafusion/sql/src/unparser/plan.rs | 1 +
datafusion/sqllogictest/test_files/prepare.slt | 18 ++++++++++
12 files changed, 130 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_planner.rs
b/datafusion/core/src/physical_planner.rs
index 5aa702550b..c16f3ad104 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1201,6 +1201,9 @@ impl DefaultPhysicalPlanner {
// statement can be prepared)
return not_impl_err!("Unsupported logical plan: Prepare");
}
+ LogicalPlan::Execute(_) => {
+ return not_impl_err!("Unsupported logical plan: Execute");
+ }
LogicalPlan::Dml(dml) => {
// DataFusion is a read-only query engine, but also a library,
so consumers may implement this
return not_impl_err!("Unsupported logical plan: Dml({0})",
dml.op);
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs
b/datafusion/expr/src/expr_rewriter/mod.rs
index d6d5c3e293..c86696854c 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -314,6 +314,7 @@ impl NamePreserver {
| LogicalPlan::Join(_)
| LogicalPlan::TableScan(_)
| LogicalPlan::Limit(_)
+ | LogicalPlan::Execute(_)
),
}
}
diff --git a/datafusion/expr/src/logical_plan/display.rs
b/datafusion/expr/src/logical_plan/display.rs
index c0549451a7..9aea7747c4 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -20,10 +20,10 @@ use std::collections::HashMap;
use std::fmt;
use crate::{
- expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn,
DmlStatement, Expr,
- Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
RecursiveQuery,
- Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown,
TableScan,
- Unnest, Values, Window,
+ expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn,
DmlStatement, Execute,
+ Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
+ RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
+ TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
};
use crate::dml::CopyTo;
@@ -626,6 +626,15 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
"Data Types": format!("{:?}", data_types)
})
}
+ LogicalPlan::Execute(Execute {
+ name, parameters, ..
+ }) => {
+ json!({
+ "Node Type": "Execute",
+ "Name": name,
+ "Parameters": expr_vec_fmt!(parameters),
+ })
+ }
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
json!({
"Node Type": "DescribeTable"
diff --git a/datafusion/expr/src/logical_plan/mod.rs
b/datafusion/expr/src/logical_plan/mod.rs
index 80a8962124..59654a2278 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -36,7 +36,7 @@ pub use ddl::{
pub use dml::{DmlStatement, WriteOp};
pub use plan::{
projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable,
Distinct,
- DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
+ DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter,
Join,
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
Prepare,
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan,
Subquery,
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
diff --git a/datafusion/expr/src/logical_plan/plan.rs
b/datafusion/expr/src/logical_plan/plan.rs
index 8ba2a44842..191a42e38e 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -266,6 +266,8 @@ pub enum LogicalPlan {
/// Prepare a statement and find any bind parameters
/// (e.g. `?`). This is used to implement SQL-prepared statements.
Prepare(Prepare),
+ /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
+ Execute(Execute),
/// Data Manipulation Language (DML): Insert / Update / Delete
Dml(DmlStatement),
/// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
@@ -314,6 +316,7 @@ impl LogicalPlan {
LogicalPlan::Subquery(Subquery { subquery, .. }) =>
subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
+ LogicalPlan::Execute(Execute { schema, .. }) => schema,
LogicalPlan::Explain(explain) => &explain.schema,
LogicalPlan::Analyze(analyze) => &analyze.schema,
LogicalPlan::Extension(extension) => extension.node.schema(),
@@ -457,6 +460,7 @@ impl LogicalPlan {
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
+ | LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => vec![],
}
}
@@ -560,6 +564,7 @@ impl LogicalPlan {
LogicalPlan::Subquery(_) => Ok(None),
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Prepare(_)
+ | LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Explain(_)
@@ -712,6 +717,7 @@ impl LogicalPlan {
LogicalPlan::Analyze(_) => Ok(self),
LogicalPlan::Explain(_) => Ok(self),
LogicalPlan::Prepare(_) => Ok(self),
+ LogicalPlan::Execute(_) => Ok(self),
LogicalPlan::TableScan(_) => Ok(self),
LogicalPlan::EmptyRelation(_) => Ok(self),
LogicalPlan::Statement(_) => Ok(self),
@@ -1072,6 +1078,14 @@ impl LogicalPlan {
input: Arc::new(input),
}))
}
+ LogicalPlan::Execute(Execute { name, schema, .. }) => {
+ self.assert_no_inputs(inputs)?;
+ Ok(LogicalPlan::Execute(Execute {
+ name: name.clone(),
+ schema: Arc::clone(schema),
+ parameters: expr,
+ }))
+ }
LogicalPlan::TableScan(ts) => {
self.assert_no_inputs(inputs)?;
Ok(LogicalPlan::TableScan(TableScan {
@@ -1330,6 +1344,7 @@ impl LogicalPlan {
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Prepare(_)
+ | LogicalPlan::Execute(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Extension(_) => None,
}
@@ -1933,6 +1948,9 @@ impl LogicalPlan {
}) => {
write!(f, "Prepare: {name:?} {data_types:?} ")
}
+ LogicalPlan::Execute(Execute { name, parameters, .. }) => {
+ write!(f, "Execute: {} params=[{}]", name,
expr_vec_fmt!(parameters))
+ }
LogicalPlan::DescribeTable(DescribeTable { .. }) => {
write!(f, "DescribeTable")
}
@@ -2599,6 +2617,27 @@ pub struct Prepare {
pub input: Arc<LogicalPlan>,
}
+/// Execute a prepared statement.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Execute {
+ /// The name of the prepared statement to execute
+ pub name: String,
+ /// The execute parameters
+ pub parameters: Vec<Expr>,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
+
+// Comparison excludes the `schema` field.
+impl PartialOrd for Execute {
+ fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+ match self.name.partial_cmp(&other.name) {
+ Some(Ordering::Equal) =>
self.parameters.partial_cmp(&other.parameters),
+ cmp => cmp,
+ }
+ }
+}
+
/// Describe the schema of table
///
/// # Example output:
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs
b/datafusion/expr/src/logical_plan/tree_node.rs
index 0658f70297..ff2c1ec1d5 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -38,10 +38,10 @@
//! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
use crate::{
dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView,
DdlStatement,
- Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter,
Join, Limit,
- LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
Repartition, Sort,
- Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode,
Values,
- Window,
+ Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension,
Filter, Join,
+ Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery,
Repartition,
+ Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest,
UserDefinedLogicalNode,
+ Values, Window,
};
use std::ops::Deref;
use std::sync::Arc;
@@ -363,6 +363,7 @@ impl TreeNode for LogicalPlan {
| LogicalPlan::Statement { .. }
| LogicalPlan::EmptyRelation { .. }
| LogicalPlan::Values { .. }
+ | LogicalPlan::Execute { .. }
| LogicalPlan::DescribeTable(_) => Transformed::no(self),
})
}
@@ -505,6 +506,9 @@ impl LogicalPlan {
.chain(fetch.iter())
.map(|e| e.deref())
.apply_until_stop(f),
+ LogicalPlan::Execute(Execute { parameters, .. }) => {
+ parameters.iter().apply_until_stop(f)
+ }
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::RecursiveQuery(_)
@@ -734,6 +738,20 @@ impl LogicalPlan {
})
})
}
+ LogicalPlan::Execute(Execute {
+ parameters,
+ name,
+ schema,
+ }) => parameters
+ .into_iter()
+ .map_until_stop_and_collect(f)?
+ .update_data(|parameters| {
+ LogicalPlan::Execute(Execute {
+ parameters,
+ name,
+ schema,
+ })
+ }),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::Unnest(_)
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index ee9ae9fb15..4fe22d2527 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -553,7 +553,8 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
- | LogicalPlan::Prepare(_) => {
+ | LogicalPlan::Prepare(_)
+ | LogicalPlan::Execute(_) => {
// This rule handles recursion itself in a
`ApplyOrder::TopDown` like
// manner.
plan.map_children(|c| self.rewrite(c, config))?
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 94c04d6328..ec2225bbc0 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -348,7 +348,8 @@ fn optimize_projections(
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
- | LogicalPlan::DescribeTable(_) => {
+ | LogicalPlan::DescribeTable(_)
+ | LogicalPlan::Execute(_) => {
// These operators have no inputs, so stop the optimization
process.
return Ok(Transformed::no(plan));
}
diff --git a/datafusion/proto/src/logical_plan/mod.rs
b/datafusion/proto/src/logical_plan/mod.rs
index b90ae88aa7..1993598f5c 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1633,6 +1633,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::RecursiveQuery(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for RecursiveQuery",
)),
+ LogicalPlan::Execute(_) => Err(proto_error(
+ "LogicalPlan serde is not yet implemented for Execute",
+ )),
}
}
}
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index abb9912b71..00949aa13a 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -48,7 +48,7 @@ use datafusion_expr::{
CreateExternalTable as PlanCreateExternalTable, CreateFunction,
CreateFunctionBody,
CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView,
DescribeTable,
DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView,
EmptyRelation,
- Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
+ Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan,
LogicalPlanBuilder,
OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
TransactionConclusion, TransactionEnd, TransactionIsolationLevel,
TransactionStart,
@@ -642,6 +642,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: Arc::new(plan),
}))
}
+ Statement::Execute {
+ name,
+ parameters,
+ using,
+ } => {
+ // `USING` is a MySQL-specific syntax and currently not
supported.
+ if !using.is_empty() {
+ return not_impl_err!(
+ "Execute statement with USING is not supported"
+ );
+ }
+
+ let empty_schema = DFSchema::empty();
+ let parameters = parameters
+ .into_iter()
+ .map(|expr| self.sql_to_expr(expr, &empty_schema,
planner_context))
+ .collect::<Result<Vec<Expr>>>()?;
+
+ Ok(LogicalPlan::Execute(Execute {
+ name: ident_to_string(&name),
+ parameters,
+ schema: DFSchemaRef::new(empty_schema),
+ }))
+ }
Statement::ShowTables {
extended,
diff --git a/datafusion/sql/src/unparser/plan.rs
b/datafusion/sql/src/unparser/plan.rs
index 6348aba490..8167ddacff 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -112,6 +112,7 @@ impl Unparser<'_> {
| LogicalPlan::Analyze(_)
| LogicalPlan::Extension(_)
| LogicalPlan::Prepare(_)
+ | LogicalPlan::Execute(_)
| LogicalPlan::Ddl(_)
| LogicalPlan::Copy(_)
| LogicalPlan::DescribeTable(_)
diff --git a/datafusion/sqllogictest/test_files/prepare.slt
b/datafusion/sqllogictest/test_files/prepare.slt
index ce4b7217f9..e306ec7767 100644
--- a/datafusion/sqllogictest/test_files/prepare.slt
+++ b/datafusion/sqllogictest/test_files/prepare.slt
@@ -80,3 +80,21 @@ PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id,
SUM(age) FROM person
statement error
PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t
(num, letter);
+
+# test creating logical plan for EXECUTE statements
+query TT
+EXPLAIN EXECUTE my_plan;
+----
+logical_plan Execute: my_plan params=[]
+
+query TT
+EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
+----
+logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]
+
+query error DataFusion error: Schema error: No field named a\.
+EXPLAIN EXECUTE my_plan(a);
+
+# TODO: support EXECUTE queries
+query error DataFusion error: This feature is not implemented: Unsupported
logical plan: Execute
+EXECUTE my_plan;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]