This is an automated email from the ASF dual-hosted git repository.

jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d2a15b3f8a feat: support logical plan for `EXECUTE` statement (#13194)
d2a15b3f8a is described below

commit d2a15b3f8a9394b4c2bf283de692f73aecc9e17c
Author: Jonah Gao <[email protected]>
AuthorDate: Fri Nov 1 10:51:31 2024 +0800

    feat: support logical plan for `EXECUTE` statement (#13194)
    
    * Add Execute LogicalPlan
    
    * Fix compile
    
    * Add tests
---
 datafusion/core/src/physical_planner.rs            |  3 ++
 datafusion/expr/src/expr_rewriter/mod.rs           |  1 +
 datafusion/expr/src/logical_plan/display.rs        | 17 +++++++---
 datafusion/expr/src/logical_plan/mod.rs            |  2 +-
 datafusion/expr/src/logical_plan/plan.rs           | 39 ++++++++++++++++++++++
 datafusion/expr/src/logical_plan/tree_node.rs      | 26 ++++++++++++---
 .../optimizer/src/common_subexpr_eliminate.rs      |  3 +-
 .../optimizer/src/optimize_projections/mod.rs      |  3 +-
 datafusion/proto/src/logical_plan/mod.rs           |  3 ++
 datafusion/sql/src/statement.rs                    | 26 ++++++++++++++-
 datafusion/sql/src/unparser/plan.rs                |  1 +
 datafusion/sqllogictest/test_files/prepare.slt     | 18 ++++++++++
 12 files changed, 130 insertions(+), 12 deletions(-)

diff --git a/datafusion/core/src/physical_planner.rs 
b/datafusion/core/src/physical_planner.rs
index 5aa702550b..c16f3ad104 100644
--- a/datafusion/core/src/physical_planner.rs
+++ b/datafusion/core/src/physical_planner.rs
@@ -1201,6 +1201,9 @@ impl DefaultPhysicalPlanner {
                 // statement can be prepared)
                 return not_impl_err!("Unsupported logical plan: Prepare");
             }
+            LogicalPlan::Execute(_) => {
+                return not_impl_err!("Unsupported logical plan: Execute");
+            }
             LogicalPlan::Dml(dml) => {
                 // DataFusion is a read-only query engine, but also a library, 
so consumers may implement this
                 return not_impl_err!("Unsupported logical plan: Dml({0})", 
dml.op);
diff --git a/datafusion/expr/src/expr_rewriter/mod.rs 
b/datafusion/expr/src/expr_rewriter/mod.rs
index d6d5c3e293..c86696854c 100644
--- a/datafusion/expr/src/expr_rewriter/mod.rs
+++ b/datafusion/expr/src/expr_rewriter/mod.rs
@@ -314,6 +314,7 @@ impl NamePreserver {
                     | LogicalPlan::Join(_)
                     | LogicalPlan::TableScan(_)
                     | LogicalPlan::Limit(_)
+                    | LogicalPlan::Execute(_)
             ),
         }
     }
diff --git a/datafusion/expr/src/logical_plan/display.rs 
b/datafusion/expr/src/logical_plan/display.rs
index c0549451a7..9aea7747c4 100644
--- a/datafusion/expr/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -20,10 +20,10 @@ use std::collections::HashMap;
 use std::fmt;
 
 use crate::{
-    expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, 
DmlStatement, Expr,
-    Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, 
RecursiveQuery,
-    Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, 
TableScan,
-    Unnest, Values, Window,
+    expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, 
DmlStatement, Execute,
+    Expr, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection,
+    RecursiveQuery, Repartition, Sort, Subquery, SubqueryAlias,
+    TableProviderFilterPushDown, TableScan, Unnest, Values, Window,
 };
 
 use crate::dml::CopyTo;
@@ -626,6 +626,15 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
                     "Data Types": format!("{:?}", data_types)
                 })
             }
+            LogicalPlan::Execute(Execute {
+                name, parameters, ..
+            }) => {
+                json!({
+                    "Node Type": "Execute",
+                    "Name": name,
+                    "Parameters": expr_vec_fmt!(parameters),
+                })
+            }
             LogicalPlan::DescribeTable(DescribeTable { .. }) => {
                 json!({
                     "Node Type": "DescribeTable"
diff --git a/datafusion/expr/src/logical_plan/mod.rs 
b/datafusion/expr/src/logical_plan/mod.rs
index 80a8962124..59654a2278 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -36,7 +36,7 @@ pub use ddl::{
 pub use dml::{DmlStatement, WriteOp};
 pub use plan::{
     projection_schema, Aggregate, Analyze, ColumnUnnestList, DescribeTable, 
Distinct,
-    DistinctOn, EmptyRelation, Explain, Extension, FetchType, Filter, Join,
+    DistinctOn, EmptyRelation, Execute, Explain, Extension, FetchType, Filter, 
Join,
     JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, 
Prepare,
     Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, 
Subquery,
     SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index 8ba2a44842..191a42e38e 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -266,6 +266,8 @@ pub enum LogicalPlan {
     /// Prepare a statement and find any bind parameters
     /// (e.g. `?`). This is used to implement SQL-prepared statements.
     Prepare(Prepare),
+    /// Execute a prepared statement. This is used to implement SQL 'EXECUTE'.
+    Execute(Execute),
     /// Data Manipulation Language (DML): Insert / Update / Delete
     Dml(DmlStatement),
     /// Data Definition Language (DDL): CREATE / DROP TABLES / VIEWS / SCHEMAS
@@ -314,6 +316,7 @@ impl LogicalPlan {
             LogicalPlan::Subquery(Subquery { subquery, .. }) => 
subquery.schema(),
             LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
             LogicalPlan::Prepare(Prepare { input, .. }) => input.schema(),
+            LogicalPlan::Execute(Execute { schema, .. }) => schema,
             LogicalPlan::Explain(explain) => &explain.schema,
             LogicalPlan::Analyze(analyze) => &analyze.schema,
             LogicalPlan::Extension(extension) => extension.node.schema(),
@@ -457,6 +460,7 @@ impl LogicalPlan {
             | LogicalPlan::Statement { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
+            | LogicalPlan::Execute { .. }
             | LogicalPlan::DescribeTable(_) => vec![],
         }
     }
@@ -560,6 +564,7 @@ impl LogicalPlan {
             LogicalPlan::Subquery(_) => Ok(None),
             LogicalPlan::EmptyRelation(_)
             | LogicalPlan::Prepare(_)
+            | LogicalPlan::Execute(_)
             | LogicalPlan::Statement(_)
             | LogicalPlan::Values(_)
             | LogicalPlan::Explain(_)
@@ -712,6 +717,7 @@ impl LogicalPlan {
             LogicalPlan::Analyze(_) => Ok(self),
             LogicalPlan::Explain(_) => Ok(self),
             LogicalPlan::Prepare(_) => Ok(self),
+            LogicalPlan::Execute(_) => Ok(self),
             LogicalPlan::TableScan(_) => Ok(self),
             LogicalPlan::EmptyRelation(_) => Ok(self),
             LogicalPlan::Statement(_) => Ok(self),
@@ -1072,6 +1078,14 @@ impl LogicalPlan {
                     input: Arc::new(input),
                 }))
             }
+            LogicalPlan::Execute(Execute { name, schema, .. }) => {
+                self.assert_no_inputs(inputs)?;
+                Ok(LogicalPlan::Execute(Execute {
+                    name: name.clone(),
+                    schema: Arc::clone(schema),
+                    parameters: expr,
+                }))
+            }
             LogicalPlan::TableScan(ts) => {
                 self.assert_no_inputs(inputs)?;
                 Ok(LogicalPlan::TableScan(TableScan {
@@ -1330,6 +1344,7 @@ impl LogicalPlan {
             | LogicalPlan::Copy(_)
             | LogicalPlan::DescribeTable(_)
             | LogicalPlan::Prepare(_)
+            | LogicalPlan::Execute(_)
             | LogicalPlan::Statement(_)
             | LogicalPlan::Extension(_) => None,
         }
@@ -1933,6 +1948,9 @@ impl LogicalPlan {
                     }) => {
                         write!(f, "Prepare: {name:?} {data_types:?} ")
                     }
+                    LogicalPlan::Execute(Execute { name, parameters, .. }) => {
+                        write!(f, "Execute: {} params=[{}]", name, 
expr_vec_fmt!(parameters))
+                    }
                     LogicalPlan::DescribeTable(DescribeTable { .. }) => {
                         write!(f, "DescribeTable")
                     }
@@ -2599,6 +2617,27 @@ pub struct Prepare {
     pub input: Arc<LogicalPlan>,
 }
 
+/// Execute a prepared statement.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct Execute {
+    /// The name of the prepared statement to execute
+    pub name: String,
+    /// The execute parameters
+    pub parameters: Vec<Expr>,
+    /// Dummy schema
+    pub schema: DFSchemaRef,
+}
+
+// Comparison excludes the `schema` field.
+impl PartialOrd for Execute {
+    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
+        match self.name.partial_cmp(&other.name) {
+            Some(Ordering::Equal) => 
self.parameters.partial_cmp(&other.parameters),
+            cmp => cmp,
+        }
+    }
+}
+
 /// Describe the schema of table
 ///
 /// # Example output:
diff --git a/datafusion/expr/src/logical_plan/tree_node.rs 
b/datafusion/expr/src/logical_plan/tree_node.rs
index 0658f70297..ff2c1ec1d5 100644
--- a/datafusion/expr/src/logical_plan/tree_node.rs
+++ b/datafusion/expr/src/logical_plan/tree_node.rs
@@ -38,10 +38,10 @@
 //! * [`LogicalPlan::expressions`]: Return a copy of the plan's expressions
 use crate::{
     dml::CopyTo, Aggregate, Analyze, CreateMemoryTable, CreateView, 
DdlStatement,
-    Distinct, DistinctOn, DmlStatement, Explain, Expr, Extension, Filter, 
Join, Limit,
-    LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, 
Repartition, Sort,
-    Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, 
Values,
-    Window,
+    Distinct, DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, 
Filter, Join,
+    Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, 
Repartition,
+    Sort, Subquery, SubqueryAlias, TableScan, Union, Unnest, 
UserDefinedLogicalNode,
+    Values, Window,
 };
 use std::ops::Deref;
 use std::sync::Arc;
@@ -363,6 +363,7 @@ impl TreeNode for LogicalPlan {
             | LogicalPlan::Statement { .. }
             | LogicalPlan::EmptyRelation { .. }
             | LogicalPlan::Values { .. }
+            | LogicalPlan::Execute { .. }
             | LogicalPlan::DescribeTable(_) => Transformed::no(self),
         })
     }
@@ -505,6 +506,9 @@ impl LogicalPlan {
                 .chain(fetch.iter())
                 .map(|e| e.deref())
                 .apply_until_stop(f),
+            LogicalPlan::Execute(Execute { parameters, .. }) => {
+                parameters.iter().apply_until_stop(f)
+            }
             // plans without expressions
             LogicalPlan::EmptyRelation(_)
             | LogicalPlan::RecursiveQuery(_)
@@ -734,6 +738,20 @@ impl LogicalPlan {
                     })
                 })
             }
+            LogicalPlan::Execute(Execute {
+                parameters,
+                name,
+                schema,
+            }) => parameters
+                .into_iter()
+                .map_until_stop_and_collect(f)?
+                .update_data(|parameters| {
+                    LogicalPlan::Execute(Execute {
+                        parameters,
+                        name,
+                        schema,
+                    })
+                }),
             // plans without expressions
             LogicalPlan::EmptyRelation(_)
             | LogicalPlan::Unnest(_)
diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs 
b/datafusion/optimizer/src/common_subexpr_eliminate.rs
index ee9ae9fb15..4fe22d2527 100644
--- a/datafusion/optimizer/src/common_subexpr_eliminate.rs
+++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs
@@ -553,7 +553,8 @@ impl OptimizerRule for CommonSubexprEliminate {
             | LogicalPlan::Copy(_)
             | LogicalPlan::Unnest(_)
             | LogicalPlan::RecursiveQuery(_)
-            | LogicalPlan::Prepare(_) => {
+            | LogicalPlan::Prepare(_)
+            | LogicalPlan::Execute(_) => {
                 // This rule handles recursion itself in a 
`ApplyOrder::TopDown` like
                 // manner.
                 plan.map_children(|c| self.rewrite(c, config))?
diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs 
b/datafusion/optimizer/src/optimize_projections/mod.rs
index 94c04d6328..ec2225bbc0 100644
--- a/datafusion/optimizer/src/optimize_projections/mod.rs
+++ b/datafusion/optimizer/src/optimize_projections/mod.rs
@@ -348,7 +348,8 @@ fn optimize_projections(
         | LogicalPlan::RecursiveQuery(_)
         | LogicalPlan::Statement(_)
         | LogicalPlan::Values(_)
-        | LogicalPlan::DescribeTable(_) => {
+        | LogicalPlan::DescribeTable(_)
+        | LogicalPlan::Execute(_) => {
             // These operators have no inputs, so stop the optimization 
process.
             return Ok(Transformed::no(plan));
         }
diff --git a/datafusion/proto/src/logical_plan/mod.rs 
b/datafusion/proto/src/logical_plan/mod.rs
index b90ae88aa7..1993598f5c 100644
--- a/datafusion/proto/src/logical_plan/mod.rs
+++ b/datafusion/proto/src/logical_plan/mod.rs
@@ -1633,6 +1633,9 @@ impl AsLogicalPlan for LogicalPlanNode {
             LogicalPlan::RecursiveQuery(_) => Err(proto_error(
                 "LogicalPlan serde is not yet implemented for RecursiveQuery",
             )),
+            LogicalPlan::Execute(_) => Err(proto_error(
+                "LogicalPlan serde is not yet implemented for Execute",
+            )),
         }
     }
 }
diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs
index abb9912b71..00949aa13a 100644
--- a/datafusion/sql/src/statement.rs
+++ b/datafusion/sql/src/statement.rs
@@ -48,7 +48,7 @@ use datafusion_expr::{
     CreateExternalTable as PlanCreateExternalTable, CreateFunction, 
CreateFunctionBody,
     CreateIndex as PlanCreateIndex, CreateMemoryTable, CreateView, 
DescribeTable,
     DmlStatement, DropCatalogSchema, DropFunction, DropTable, DropView, 
EmptyRelation,
-    Explain, Expr, ExprSchemable, Filter, LogicalPlan, LogicalPlanBuilder,
+    Execute, Explain, Expr, ExprSchemable, Filter, LogicalPlan, 
LogicalPlanBuilder,
     OperateFunctionArg, PlanType, Prepare, SetVariable, SortExpr,
     Statement as PlanStatement, ToStringifiedPlan, TransactionAccessMode,
     TransactionConclusion, TransactionEnd, TransactionIsolationLevel, 
TransactionStart,
@@ -642,6 +642,30 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
                     input: Arc::new(plan),
                 }))
             }
+            Statement::Execute {
+                name,
+                parameters,
+                using,
+            } => {
+                // `USING` is a MySQL-specific syntax and currently not 
supported.
+                if !using.is_empty() {
+                    return not_impl_err!(
+                        "Execute statement with USING is not supported"
+                    );
+                }
+
+                let empty_schema = DFSchema::empty();
+                let parameters = parameters
+                    .into_iter()
+                    .map(|expr| self.sql_to_expr(expr, &empty_schema, 
planner_context))
+                    .collect::<Result<Vec<Expr>>>()?;
+
+                Ok(LogicalPlan::Execute(Execute {
+                    name: ident_to_string(&name),
+                    parameters,
+                    schema: DFSchemaRef::new(empty_schema),
+                }))
+            }
 
             Statement::ShowTables {
                 extended,
diff --git a/datafusion/sql/src/unparser/plan.rs 
b/datafusion/sql/src/unparser/plan.rs
index 6348aba490..8167ddacff 100644
--- a/datafusion/sql/src/unparser/plan.rs
+++ b/datafusion/sql/src/unparser/plan.rs
@@ -112,6 +112,7 @@ impl Unparser<'_> {
             | LogicalPlan::Analyze(_)
             | LogicalPlan::Extension(_)
             | LogicalPlan::Prepare(_)
+            | LogicalPlan::Execute(_)
             | LogicalPlan::Ddl(_)
             | LogicalPlan::Copy(_)
             | LogicalPlan::DescribeTable(_)
diff --git a/datafusion/sqllogictest/test_files/prepare.slt 
b/datafusion/sqllogictest/test_files/prepare.slt
index ce4b7217f9..e306ec7767 100644
--- a/datafusion/sqllogictest/test_files/prepare.slt
+++ b/datafusion/sqllogictest/test_files/prepare.slt
@@ -80,3 +80,21 @@ PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id, 
SUM(age) FROM person
 
 statement error
 PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t 
(num, letter);
+
+# test creating logical plan for EXECUTE statements
+query TT
+EXPLAIN EXECUTE my_plan;
+----
+logical_plan Execute: my_plan params=[]
+
+query TT
+EXPLAIN EXECUTE my_plan(10*2 + 1, 'Foo');
+----
+logical_plan Execute: my_plan params=[Int64(21), Utf8("Foo")]
+
+query error DataFusion error: Schema error: No field named a\.
+EXPLAIN EXECUTE my_plan(a);
+
+# TODO: support EXECUTE queries
+query error DataFusion error: This feature is not implemented: Unsupported 
logical plan: Execute
+EXECUTE my_plan;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to