This is an automated email from the ASF dual-hosted git repository.

jonah pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 34d9d3a698 feat: basic support for executing prepared statements 
(#13242)
34d9d3a698 is described below

commit 34d9d3a698626436edc3b2bf3ee59fbd77d8ebb9
Author: Jonah Gao <[email protected]>
AuthorDate: Thu Nov 7 10:11:05 2024 +0800

    feat: basic support for executing prepared statements (#13242)
    
    * feat: basic support for executing prepared statements
    
    * Improve execute_prepared
    
    * Fix tests
    
    * Update doc
    
    * Add test
    
    * Add issue test
    
    * Respect allow_statements option
---
 datafusion/core/src/execution/context/mod.rs   |  84 +++++++++++-
 datafusion/core/src/execution/session_state.rs |  38 +++++-
 datafusion/core/tests/sql/select.rs            |  30 +----
 datafusion/core/tests/sql/sql_api.rs           |  24 ++++
 datafusion/expr/src/logical_plan/plan.rs       |  16 +++
 datafusion/sqllogictest/test_files/prepare.slt | 171 +++++++++++++++++++++----
 6 files changed, 304 insertions(+), 59 deletions(-)

diff --git a/datafusion/core/src/execution/context/mod.rs 
b/datafusion/core/src/execution/context/mod.rs
index 333f83c673..7868a7f9e5 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -42,7 +42,8 @@ use crate::{
     logical_expr::{
         CreateCatalog, CreateCatalogSchema, CreateExternalTable, 
CreateFunction,
         CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, 
DropTable,
-        DropView, LogicalPlan, LogicalPlanBuilder, SetVariable, TableType, 
UNNAMED_TABLE,
+        DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, 
SetVariable,
+        TableType, UNNAMED_TABLE,
     },
     physical_expr::PhysicalExpr,
     physical_plan::ExecutionPlan,
@@ -54,9 +55,9 @@ use arrow::record_batch::RecordBatch;
 use arrow_schema::Schema;
 use datafusion_common::{
     config::{ConfigExtension, TableOptions},
-    exec_err, not_impl_err, plan_datafusion_err, plan_err,
+    exec_datafusion_err, exec_err, not_impl_err, plan_datafusion_err, plan_err,
     tree_node::{TreeNodeRecursion, TreeNodeVisitor},
-    DFSchema, SchemaReference, TableReference,
+    DFSchema, ParamValues, ScalarValue, SchemaReference, TableReference,
 };
 use datafusion_execution::registry::SerializerRegistry;
 use datafusion_expr::{
@@ -687,7 +688,31 @@ impl SessionContext {
             LogicalPlan::Statement(Statement::SetVariable(stmt)) => {
                 self.set_variable(stmt).await
             }
-
+            LogicalPlan::Prepare(Prepare {
+                name,
+                input,
+                data_types,
+            }) => {
+                // The number of parameters must match the specified data 
types length.
+                if !data_types.is_empty() {
+                    let param_names = input.get_parameter_names()?;
+                    if param_names.len() != data_types.len() {
+                        return plan_err!(
+                            "Prepare specifies {} data types but query has {} 
parameters",
+                            data_types.len(),
+                            param_names.len()
+                        );
+                    }
+                }
+                // Store the unoptimized plan into the session state. Although 
storing the
+                // optimized plan or the physical plan would be more 
efficient, doing so is
+                // not currently feasible. This is because `now()` would be 
optimized to a
+                // constant value, causing each EXECUTE to yield the same 
result, which is
+                // incorrect behavior.
+                self.state.write().store_prepared(name, data_types, input)?;
+                self.return_empty_dataframe()
+            }
+            LogicalPlan::Execute(execute) => self.execute_prepared(execute),
             plan => Ok(DataFrame::new(self.state(), plan)),
         }
     }
@@ -1088,6 +1113,49 @@ impl SessionContext {
         }
     }
 
+    fn execute_prepared(&self, execute: Execute) -> Result<DataFrame> {
+        let Execute {
+            name, parameters, ..
+        } = execute;
+        let prepared = self.state.read().get_prepared(&name).ok_or_else(|| {
+            exec_datafusion_err!("Prepared statement '{}' does not exist", 
name)
+        })?;
+
+        // Only allow literals as parameters for now.
+        let mut params: Vec<ScalarValue> = parameters
+            .into_iter()
+            .map(|e| match e {
+                Expr::Literal(scalar) => Ok(scalar),
+                _ => not_impl_err!("Unsupported parameter type: {}", e),
+            })
+            .collect::<Result<_>>()?;
+
+        // If the prepared statement provides data types, cast the params to 
those types.
+        if !prepared.data_types.is_empty() {
+            if params.len() != prepared.data_types.len() {
+                return exec_err!(
+                    "Prepared statement '{}' expects {} parameters, but {} 
provided",
+                    name,
+                    prepared.data_types.len(),
+                    params.len()
+                );
+            }
+            params = params
+                .into_iter()
+                .zip(prepared.data_types.iter())
+                .map(|(e, dt)| e.cast_to(dt))
+                .collect::<Result<_>>()?;
+        }
+
+        let params = ParamValues::List(params);
+        let plan = prepared
+            .plan
+            .as_ref()
+            .clone()
+            .replace_params_with_values(&params)?;
+        Ok(DataFrame::new(self.state(), plan))
+    }
+
     /// Registers a variable provider within this context.
     pub fn register_variable(
         &self,
@@ -1705,6 +1773,14 @@ impl<'n, 'a> TreeNodeVisitor<'n> for BadPlanVisitor<'a> {
             LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
                 plan_err!("Statement not supported: {}", stmt.name())
             }
+            // TODO: Implement PREPARE as a LogicalPlan::Statement
+            LogicalPlan::Prepare(_) if !self.options.allow_statements => {
+                plan_err!("Statement not supported: PREPARE")
+            }
+            // TODO: Implement EXECUTE as a LogicalPlan::Statement
+            LogicalPlan::Execute(_) if !self.options.allow_statements => {
+                plan_err!("Statement not supported: EXECUTE")
+            }
             _ => Ok(TreeNodeRecursion::Continue),
         }
     }
diff --git a/datafusion/core/src/execution/session_state.rs 
b/datafusion/core/src/execution/session_state.rs
index d50c912dd2..ecb59f7b03 100644
--- a/datafusion/core/src/execution/session_state.rs
+++ b/datafusion/core/src/execution/session_state.rs
@@ -40,7 +40,7 @@ use datafusion_common::display::{PlanType, StringifiedPlan, 
ToStringifiedPlan};
 use datafusion_common::file_options::file_type::FileType;
 use datafusion_common::tree_node::TreeNode;
 use datafusion_common::{
-    config_err, not_impl_err, plan_datafusion_err, DFSchema, DataFusionError,
+    config_err, exec_err, not_impl_err, plan_datafusion_err, DFSchema, 
DataFusionError,
     ResolvedTableReference, TableReference,
 };
 use datafusion_execution::config::SessionConfig;
@@ -171,6 +171,9 @@ pub struct SessionState {
     /// It will be invoked on `CREATE FUNCTION` statements.
     /// thus, changing dialect o PostgreSql is required
     function_factory: Option<Arc<dyn FunctionFactory>>,
+    /// Cache logical plans of prepared statements for later execution.
+    /// Key is the prepared statement name.
+    prepared_plans: HashMap<String, Arc<PreparedPlan>>,
 }
 
 impl Debug for SessionState {
@@ -197,6 +200,7 @@ impl Debug for SessionState {
             .field("scalar_functions", &self.scalar_functions)
             .field("aggregate_functions", &self.aggregate_functions)
             .field("window_functions", &self.window_functions)
+            .field("prepared_plans", &self.prepared_plans)
             .finish()
     }
 }
@@ -906,6 +910,29 @@ impl SessionState {
         let udtf = self.table_functions.remove(name);
         Ok(udtf.map(|x| x.function().clone()))
     }
+
+    /// Store the logical plan and the parameter types of a prepared statement.
+    pub(crate) fn store_prepared(
+        &mut self,
+        name: String,
+        data_types: Vec<DataType>,
+        plan: Arc<LogicalPlan>,
+    ) -> datafusion_common::Result<()> {
+        match self.prepared_plans.entry(name) {
+            Entry::Vacant(e) => {
+                e.insert(Arc::new(PreparedPlan { data_types, plan }));
+                Ok(())
+            }
+            Entry::Occupied(e) => {
+                exec_err!("Prepared statement '{}' already exists", e.key())
+            }
+        }
+    }
+
+    /// Get the prepared plan with the given name.
+    pub(crate) fn get_prepared(&self, name: &str) -> Option<Arc<PreparedPlan>> 
{
+        self.prepared_plans.get(name).map(Arc::clone)
+    }
 }
 
 /// A builder to be used for building [`SessionState`]'s. Defaults will
@@ -1327,6 +1354,7 @@ impl SessionStateBuilder {
             table_factories: table_factories.unwrap_or_default(),
             runtime_env,
             function_factory,
+            prepared_plans: HashMap::new(),
         };
 
         if let Some(file_formats) = file_formats {
@@ -1876,6 +1904,14 @@ impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> {
     }
 }
 
+#[derive(Debug)]
+pub(crate) struct PreparedPlan {
+    /// Data types of the parameters
+    pub(crate) data_types: Vec<DataType>,
+    /// The prepared logical plan
+    pub(crate) plan: Arc<LogicalPlan>,
+}
+
 #[cfg(test)]
 mod tests {
     use super::{SessionContextProvider, SessionStateBuilder};
diff --git a/datafusion/core/tests/sql/select.rs 
b/datafusion/core/tests/sql/select.rs
index dd660512f3..2e815303e3 100644
--- a/datafusion/core/tests/sql/select.rs
+++ b/datafusion/core/tests/sql/select.rs
@@ -57,7 +57,6 @@ async fn test_named_query_parameters() -> Result<()> {
     let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
 
     // sql to statement then to logical plan with parameters
-    // c1 defined as UINT32, c2 defined as UInt64
     let results = ctx
         .sql("SELECT c1, c2 FROM test WHERE c1 > $coo AND c1 < $foo")
         .await?
@@ -106,9 +105,9 @@ async fn test_prepare_statement() -> Result<()> {
     let ctx = create_ctx_with_partition(&tmp_dir, partition_count).await?;
 
     // sql to statement then to prepare logical plan with parameters
-    // c1 defined as UINT32, c2 defined as UInt64 but the params are Int32 and 
Float64
-    let dataframe =
-        ctx.sql("PREPARE my_plan(INT, DOUBLE) AS SELECT c1, c2 FROM test WHERE 
c1 > $2 AND c1 < $1").await?;
+    let dataframe = ctx
+        .sql("SELECT c1, c2 FROM test WHERE c1 > $2 AND c1 < $1")
+        .await?;
 
     // prepare logical plan to logical plan without parameters
     let param_values = vec![ScalarValue::Int32(Some(3)), 
ScalarValue::Float64(Some(0.0))];
@@ -156,7 +155,7 @@ async fn prepared_statement_type_coercion() -> Result<()> {
         ("unsigned", Arc::new(unsigned_ints) as ArrayRef),
     ])?;
     ctx.register_batch("test", batch)?;
-    let results = ctx.sql("PREPARE my_plan(BIGINT, INT, TEXT) AS SELECT 
signed, unsigned FROM test WHERE $1 >= signed AND signed <= $2 AND unsigned = 
$3")
+    let results = ctx.sql("SELECT signed, unsigned FROM test WHERE $1 >= 
signed AND signed <= $2 AND unsigned = $3")
         .await?
         .with_param_values(vec![
             ScalarValue::from(1_i64),
@@ -176,27 +175,6 @@ async fn prepared_statement_type_coercion() -> Result<()> {
     Ok(())
 }
 
-#[tokio::test]
-async fn prepared_statement_invalid_types() -> Result<()> {
-    let ctx = SessionContext::new();
-    let signed_ints: Int32Array = vec![-1, 0, 1].into();
-    let unsigned_ints: UInt64Array = vec![1, 2, 3].into();
-    let batch = RecordBatch::try_from_iter(vec![
-        ("signed", Arc::new(signed_ints) as ArrayRef),
-        ("unsigned", Arc::new(unsigned_ints) as ArrayRef),
-    ])?;
-    ctx.register_batch("test", batch)?;
-    let results = ctx
-        .sql("PREPARE my_plan(INT) AS SELECT signed FROM test WHERE signed = 
$1")
-        .await?
-        .with_param_values(vec![ScalarValue::from("1")]);
-    assert_eq!(
-        results.unwrap_err().strip_backtrace(),
-        "Error during planning: Expected parameter of type Int32, got Utf8 at 
index 0"
-    );
-    Ok(())
-}
-
 #[tokio::test]
 async fn test_parameter_type_coercion() -> Result<()> {
     let ctx = SessionContext::new();
diff --git a/datafusion/core/tests/sql/sql_api.rs 
b/datafusion/core/tests/sql/sql_api.rs
index 48f4a66b65..b2ffefa437 100644
--- a/datafusion/core/tests/sql/sql_api.rs
+++ b/datafusion/core/tests/sql/sql_api.rs
@@ -113,6 +113,30 @@ async fn unsupported_statement_returns_error() {
     ctx.sql_with_options(sql, options).await.unwrap();
 }
 
+// Disallow PREPARE and EXECUTE statements if `allow_statements` is false
+#[tokio::test]
+async fn disable_prepare_and_execute_statement() {
+    let ctx = SessionContext::new();
+
+    let prepare_sql = "PREPARE plan(INT) AS SELECT $1";
+    let execute_sql = "EXECUTE plan(1)";
+    let options = SQLOptions::new().with_allow_statements(false);
+    let df = ctx.sql_with_options(prepare_sql, options).await;
+    assert_eq!(
+        df.unwrap_err().strip_backtrace(),
+        "Error during planning: Statement not supported: PREPARE"
+    );
+    let df = ctx.sql_with_options(execute_sql, options).await;
+    assert_eq!(
+        df.unwrap_err().strip_backtrace(),
+        "Error during planning: Statement not supported: EXECUTE"
+    );
+
+    let options = options.with_allow_statements(true);
+    ctx.sql_with_options(prepare_sql, options).await.unwrap();
+    ctx.sql_with_options(execute_sql, options).await.unwrap();
+}
+
 #[tokio::test]
 async fn empty_statement_returns_error() {
     let ctx = SessionContext::new();
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index ea8fca3ec9..db309d9b52 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -1440,6 +1440,22 @@ impl LogicalPlan {
         .map(|res| res.data)
     }
 
+    /// Walk the logical plan, find any `Placeholder` tokens, and return a set 
of their names.
+    pub fn get_parameter_names(&self) -> Result<HashSet<String>> {
+        let mut param_names = HashSet::new();
+        self.apply_with_subqueries(|plan| {
+            plan.apply_expressions(|expr| {
+                expr.apply(|expr| {
+                    if let Expr::Placeholder(Placeholder { id, .. }) = expr {
+                        param_names.insert(id.clone());
+                    }
+                    Ok(TreeNodeRecursion::Continue)
+                })
+            })
+        })
+        .map(|_| param_names)
+    }
+
     /// Walk the logical plan, find any `Placeholder` tokens, and return a map 
of their IDs and DataTypes
     pub fn get_parameter_types(
         &self,
diff --git a/datafusion/sqllogictest/test_files/prepare.slt 
b/datafusion/sqllogictest/test_files/prepare.slt
index 91b925efa2..b0c67af9e1 100644
--- a/datafusion/sqllogictest/test_files/prepare.slt
+++ b/datafusion/sqllogictest/test_files/prepare.slt
@@ -30,56 +30,175 @@ select * from person;
 # Error due to syntax and semantic violation
 
 # Syntax error: no name specified after the keyword prepare
-statement error
+statement error DataFusion error: SQL error: ParserError
 PREPARE AS SELECT id, age  FROM person WHERE age = $foo;
 
 # param following a non-number, $foo, not supported
-statement error
+statement error Invalid placeholder, not a number: \$foo
 PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $foo;
 
 # not specify table hence cannot specify columns
-statement error
+statement error Schema error: No field named id
 PREPARE my_plan(INT) AS SELECT id + $1;
 
 # not specify data types for all params
-statement error
+statement error Prepare specifies 1 data types but query has 2 parameters
 PREPARE my_plan(INT) AS SELECT 1 + $1 + $2;
 
+# sepecify too many data types for params
+statement error Prepare specifies 2 data types but query has 1 parameters
+PREPARE my_plan(INT, INT) AS SELECT 1 + $1;
+
 # cannot use IS param
-statement error
+statement error SQL error: ParserError
 PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age is $1;
 
+# TODO: allow prepare without specifying data types
+statement error Placeholder type could not be resolved
+PREPARE my_plan AS SELECT $1;
+
 # #######################
-# TODO: all the errors below should work ok after we store the prepare logical 
plan somewhere
-statement error
+# Test prepare and execute statements
+
+# execute a non-existing plan
+statement error Prepared statement \'my_plan\' does not exist
+EXECUTE my_plan('Foo', 'Bar');
+
+statement ok
 PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t 
(num, letter);
 
-statement error
+query IT
+EXECUTE my_plan('Foo', 'Bar');
+----
+1 Foo
+2 Bar
+
+# duplicate prepare statement
+statement error Prepared statement \'my_plan\' already exists
+PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t 
(num, letter);
+
+statement error Prepare specifies 1 data types but query has 0 parameters
 PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = 10;
 
-statement error
-PREPARE my_plan AS SELECT id, age  FROM person WHERE age = 10;
+# prepare statement has no params
+statement ok
+PREPARE my_plan2 AS SELECT id, age FROM person WHERE age = 20;
+
+query II
+EXECUTE my_plan2;
+----
+1 20
+
+statement ok
+PREPARE my_plan3(INT) AS SELECT $1;
 
-statement error
-PREPARE my_plan(INT) AS SELECT $1;
+query I
+EXECUTE my_plan3(10);
+----
+10
 
-statement error
-PREPARE my_plan(INT) AS SELECT 1 + $1;
+statement ok
+PREPARE my_plan4(INT) AS SELECT 1 + $1;
 
-statement error
-PREPARE my_plan(INT, DOUBLE) AS SELECT 1 + $1 + $2;
+query I
+EXECUTE my_plan4(10);
+----
+11
 
-statement error
-PREPARE my_plan(INT) AS SELECT id, age  FROM person WHERE age = $1;
+statement ok
+PREPARE my_plan5(INT, DOUBLE) AS SELECT 1 + $1 + $2;
 
-statement error
-PREPARE my_plan(INT, STRING, DOUBLE, INT, DOUBLE, STRING) AS SELECT id, age, 
$6 FROM person WHERE age IN ($1, $4) AND salary > $3 and salary < $5 OR 
first_name < $2";
+query R
+EXECUTE my_plan5(10, 20.5);
+----
+31.5
 
-statement error
-PREPARE my_plan(INT, DOUBLE, DOUBLE, DOUBLE) AS SELECT id, SUM(age) FROM 
person WHERE salary > $2 GROUP BY id HAVING sum(age) < $1 AND SUM(age) > 10 OR 
SUM(age) in ($3, $4);
+statement ok
+PREPARE my_plan6(INT) AS SELECT id, age FROM person WHERE age = $1;
+
+query II
+EXECUTE my_plan6(20);
+----
+1 20
+
+# EXECUTE param is a different type but compatible
+query II
+EXECUTE my_plan6('20');
+----
+1 20
+
+query II
+EXECUTE my_plan6(20.0);
+----
+1 20
+
+# invalid execute param
+statement error Cast error: Cannot cast string 'foo' to value of Int32 type
+EXECUTE my_plan6('foo');
+
+# TODO: support non-literal expressions
+statement error Unsupported parameter type
+EXECUTE my_plan6(10 + 20);
+
+statement ok
+PREPARE my_plan7(INT, STRING, DOUBLE, INT, DOUBLE, STRING)
+    AS
+SELECT id, age, $6 FROM person WHERE age IN ($1, $4) AND salary > $3 and 
salary < $5 OR first_name < $2;
+
+query IIT
+EXECUTE my_plan7(10, 'jane', 99999.45, 20, 200000.45, 'foo');
+----
+1 20 foo
+
+statement ok
+PREPARE my_plan8(INT, DOUBLE, DOUBLE, DOUBLE)
+    AS
+SELECT id, SUM(age) FROM person WHERE salary > $2 GROUP BY id
+    HAVING sum(age) < $1 AND SUM(age) > 10 OR SUM(age) in ($3, $4);
+
+query II
+EXECUTE my_plan8(100000, 99999.45, 100000.45, 200000.45);
+----
+1 20
+
+statement ok
+PREPARE my_plan9(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS 
t (num, letter);
+
+query IT
+EXECUTE my_plan9('Foo', 'Bar');
+----
+1 Foo
+2 Bar
+
+
+# Test issue: https://github.com/apache/datafusion/issues/12294
+# prepare argument is in the LIMIT clause
+statement ok
+CREATE TABLE test(id INT, run_id TEXT) AS VALUES(1, 'foo'), (1, 'foo'), (3, 
'bar');
+
+statement ok
+PREPARE get_N_rand_ints_from_last_run(INT) AS
+SELECT id
+FROM
+    "test"
+WHERE run_id = 'foo'
+ORDER BY random()
+LIMIT $1
+
+query I
+EXECUTE get_N_rand_ints_from_last_run(1);
+----
+1
+
+query I
+EXECUTE get_N_rand_ints_from_last_run(2);
+----
+1
+1
+
+statement ok
+DROP TABLE test;
 
-statement error
-PREPARE my_plan(STRING, STRING) AS SELECT * FROM (VALUES(1, $1), (2, $2)) AS t 
(num, letter);
 
 # test creating logical plan for EXECUTE statements
 query TT
@@ -96,7 +215,3 @@ physical_plan_error This feature is not implemented: 
Unsupported logical plan: E
 
 query error DataFusion error: Schema error: No field named a\.
 EXPLAIN EXECUTE my_plan(a);
-
-# TODO: support EXECUTE queries
-query error DataFusion error: This feature is not implemented: Unsupported 
logical plan: Execute
-EXECUTE my_plan;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to