This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f3722c0af8 Add `SQLOptions` for controlling allowed SQL statements, 
update docs (#7333)
f3722c0af8 is described below

commit f3722c0af8418bcb19cf9dc5f7e458a3aa5f0f32
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Aug 22 08:43:47 2023 -0400

    Add `SQLOptions` for controlling allowed SQL statements, update docs (#7333)
    
    * Add `SQLOptions` for controlling allowed SQL statements, update docs
    
    * fix docs
---
 datafusion/core/src/execution/context.rs | 263 ++++++++++++++++++++++++-------
 datafusion/core/src/prelude.rs           |   2 +-
 datafusion/core/tests/sql/mod.rs         |   1 +
 datafusion/core/tests/sql/sql_api.rs     | 116 ++++++++++++++
 4 files changed, 326 insertions(+), 56 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index c0a0134fed..c97f770ab3 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -28,7 +28,11 @@ use crate::{
     optimizer::optimizer::Optimizer,
     physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
 };
-use datafusion_common::{alias::AliasGenerator, not_impl_err, plan_err};
+use datafusion_common::{
+    alias::AliasGenerator,
+    not_impl_err, plan_err,
+    tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion},
+};
 use datafusion_execution::registry::SerializerRegistry;
 use datafusion_expr::{
     logical_plan::{DdlStatement, Statement},
@@ -163,12 +167,14 @@ where
 /// * Register a custom data source that can be referenced from a SQL query.
 /// * Execution a SQL query
 ///
+/// # Example: DataFrame API
+///
 /// The following example demonstrates how to use the context to execute a 
query against a CSV
 /// data source using the DataFrame API:
 ///
 /// ```
 /// use datafusion::prelude::*;
-/// # use datafusion::error::Result;
+/// # use datafusion::{error::Result, assert_batches_eq};
 /// # #[tokio::main]
 /// # async fn main() -> Result<()> {
 /// let ctx = SessionContext::new();
@@ -176,22 +182,49 @@ where
 /// let df = df.filter(col("a").lt_eq(col("b")))?
 ///            .aggregate(vec![col("a")], vec![min(col("b"))])?
 ///            .limit(0, Some(100))?;
-/// let results = df.collect();
+/// let results = df
+///   .collect()
+///   .await?;
+/// assert_batches_eq!(
+///  &[
+///    "+---+----------------+",
+///    "| a | MIN(?table?.b) |",
+///    "+---+----------------+",
+///    "| 1 | 2              |",
+///    "+---+----------------+",
+///  ],
+///  &results
+/// );
 /// # Ok(())
 /// # }
 /// ```
 ///
+/// # Example: SQL API
+///
 /// The following example demonstrates how to execute the same query using SQL:
 ///
 /// ```
 /// use datafusion::prelude::*;
-///
-/// # use datafusion::error::Result;
+/// # use datafusion::{error::Result, assert_batches_eq};
 /// # #[tokio::main]
 /// # async fn main() -> Result<()> {
 /// let mut ctx = SessionContext::new();
 /// ctx.register_csv("example", "tests/data/example.csv", 
CsvReadOptions::new()).await?;
-/// let results = ctx.sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 
100").await?;
+/// let results = ctx
+///   .sql("SELECT a, MIN(b) FROM example GROUP BY a LIMIT 100")
+///   .await?
+///   .collect()
+///   .await?;
+/// assert_batches_eq!(
+///  &[
+///    "+---+----------------+",
+///    "| a | MIN(example.b) |",
+///    "+---+----------------+",
+///    "| 1 | 2              |",
+///    "+---+----------------+",
+///  ],
+///  &results
+/// );
 /// # Ok(())
 /// # }
 /// ```
@@ -342,22 +375,82 @@ impl SessionContext {
         self.state.read().config.clone()
     }
 
-    /// Creates a [`DataFrame`] that will execute a SQL query.
+    /// Creates a [`DataFrame`] from SQL query text.
     ///
     /// Note: This API implements DDL statements such as `CREATE TABLE` and
     /// `CREATE VIEW` and DML statements such as `INSERT INTO` with in-memory
-    /// default implementations.
+    /// default implementations. See [`Self::sql_with_options`].
+    ///
+    /// # Example: Running SQL queries
+    ///
+    /// See the example on [`Self`]
     ///
-    /// If this is not desirable, consider using 
[`SessionState::create_logical_plan()`] which
-    /// does not mutate the state based on such statements.
+    /// # Example: Creating a Table with SQL
+    ///
+    /// ```
+    /// use datafusion::prelude::*;
+    /// # use datafusion::{error::Result, assert_batches_eq};
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = SessionContext::new();
+    /// ctx
+    ///   .sql("CREATE TABLE foo (x INTEGER)")
+    ///   .await?
+    ///   .collect()
+    ///   .await?;
+    /// assert!(ctx.table_exist("foo").unwrap());
+    /// # Ok(())
+    /// # }
+    /// ```
     pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
-        // create a query planner
+        self.sql_with_options(sql, SQLOptions::new()).await
+    }
+
+    /// Creates a [`DataFrame`] from SQL query text, first validating
+    /// that the queries are allowed by `options`
+    ///
+    /// # Example: Preventing Creating a Table with SQL
+    ///
+    /// If you want to avoid creating tables, or modifying data or the
+    /// session, set [`SQLOptions`] appropriately:
+    ///
+    /// ```
+    /// use datafusion::prelude::*;
+    /// # use datafusion::{error::Result};
+    /// # use datafusion::physical_plan::collect;
+    /// # #[tokio::main]
+    /// # async fn main() -> Result<()> {
+    /// let mut ctx = SessionContext::new();
+    /// let options = SQLOptions::new()
+    ///   .with_allow_ddl(false);
+    /// let err = ctx.sql_with_options("CREATE TABLE foo (x INTEGER)", options)
+    ///   .await
+    ///   .unwrap_err();
+    /// assert_eq!(
+    ///   err.to_string(),
+    ///   "Error during planning: DDL not supported: CreateMemoryTable"
+    /// );
+    /// # Ok(())
+    /// # }
+    /// ```
+    pub async fn sql_with_options(
+        &self,
+        sql: &str,
+        options: SQLOptions,
+    ) -> Result<DataFrame> {
         let plan = self.state().create_logical_plan(sql).await?;
+        options.verify_plan(&plan)?;
 
         self.execute_logical_plan(plan).await
     }
 
-    /// Execute the [`LogicalPlan`], return a [`DataFrame`]
+    /// Execute the [`LogicalPlan`], return a [`DataFrame`]. This API
+    /// is not featured limited (so all SQL such as `CREATE TABLE` and
+    /// `COPY` will be run).
+    ///
+    /// If you wish to limit the type of plan that can be run from
+    /// SQL, see [`Self::sql_with_options`] and
+    /// [`SQLOptions::verify_plan`].
     pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> 
Result<DataFrame> {
         match plan {
             LogicalPlan::Ddl(ddl) => match ddl {
@@ -1304,7 +1397,7 @@ impl FunctionRegistry for SessionContext {
 /// A planner used to add extensions to DataFusion logical and physical plans.
 #[async_trait]
 pub trait QueryPlanner {
-    /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
+    /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for 
execution
     async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
@@ -1317,7 +1410,7 @@ struct DefaultQueryPlanner {}
 
 #[async_trait]
 impl QueryPlanner for DefaultQueryPlanner {
-    /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution
+    /// Given a `LogicalPlan`, create an [`ExecutionPlan`] suitable for 
execution
     async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
@@ -1628,7 +1721,8 @@ impl SessionState {
         &mut self.table_factories
     }
 
-    /// Convert a SQL string into an AST Statement
+    /// Parse an SQL string into an DataFusion specific AST
+    /// [`Statement`]. See [`SessionContext::sql`] for running queries.
     pub fn sql_to_statement(
         &self,
         sql: &str,
@@ -1787,9 +1881,15 @@ impl SessionState {
         query.statement_to_plan(statement)
     }
 
-    /// Creates a [`LogicalPlan`] from the provided SQL string
+    /// Creates a [`LogicalPlan`] from the provided SQL string. This
+    /// interface will plan any SQL DataFusion supports, including DML
+    /// like `CREATE TABLE`, and `COPY` (which can write to local
+    /// files.
     ///
-    /// See [`SessionContext::sql`] for a higher-level interface that also 
handles DDL
+    /// See [`SessionContext::sql`] and
+    /// [`SessionContext::sql_with_options`] for a higher-level
+    /// interface that handles DDL and verification of allowed
+    /// statements.
     pub async fn create_logical_plan(&self, sql: &str) -> Result<LogicalPlan> {
         let dialect = self.config.options().sql_parser.dialect.as_str();
         let statement = self.sql_to_statement(sql, dialect)?;
@@ -1870,7 +1970,11 @@ impl SessionState {
 
     /// Creates a physical plan from a logical plan.
     ///
-    /// Note: this first calls [`Self::optimize`] on the provided plan
+    /// Note: this first calls [`Self::optimize`] on the provided
+    /// plan.
+    ///
+    /// This function will error for [`LogicalPlan`]s such as catalog
+    /// DDL `CREATE TABLE` must be handled by another layer.
     pub async fn create_physical_plan(
         &self,
         logical_plan: &LogicalPlan,
@@ -2095,6 +2199,92 @@ impl SerializerRegistry for EmptySerializerRegistry {
     }
 }
 
+/// Describes which SQL statements can be run.
+///
+/// See [`SessionContext::sql_with_options`] for more details.
+#[derive(Clone, Debug, Copy)]
+pub struct SQLOptions {
+    /// See [`Self::with_allow_ddl`]
+    allow_ddl: bool,
+    /// See [`Self::with_allow_dml`]
+    allow_dml: bool,
+    /// See [`Self::with_allow_statements`]
+    allow_statements: bool,
+}
+
+impl Default for SQLOptions {
+    fn default() -> Self {
+        Self {
+            allow_ddl: true,
+            allow_dml: true,
+            allow_statements: true,
+        }
+    }
+}
+
+impl SQLOptions {
+    /// Create a new `SQLOptions` with default values
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    /// Should DML data modification commands  (e.g. `INSERT and COPY`) be 
run? Defaults to `true`.
+    pub fn with_allow_ddl(mut self, allow: bool) -> Self {
+        self.allow_ddl = allow;
+        self
+    }
+
+    /// Should DML data modification commands (e.g. `INSERT and COPY`) be run? 
Defaults to `true`
+    pub fn with_allow_dml(mut self, allow: bool) -> Self {
+        self.allow_dml = allow;
+        self
+    }
+
+    /// Should Statements such as (e.g. `SET VARIABLE and `BEGIN TRANSACTION` 
...`) be run?. Defaults to `true`
+    pub fn with_allow_statements(mut self, allow: bool) -> Self {
+        self.allow_statements = allow;
+        self
+    }
+
+    /// Return an error if the [`LogicalPlan`] has any nodes that are
+    /// incompatible with this [`SQLOptions`].
+    pub fn verify_plan(&self, plan: &LogicalPlan) -> Result<()> {
+        plan.visit(&mut BadPlanVisitor::new(self))?;
+        Ok(())
+    }
+}
+
+struct BadPlanVisitor<'a> {
+    options: &'a SQLOptions,
+}
+impl<'a> BadPlanVisitor<'a> {
+    fn new(options: &'a SQLOptions) -> Self {
+        Self { options }
+    }
+}
+
+impl<'a> TreeNodeVisitor for BadPlanVisitor<'a> {
+    type N = LogicalPlan;
+
+    fn pre_visit(&mut self, node: &Self::N) -> Result<VisitRecursion> {
+        match node {
+            LogicalPlan::Ddl(ddl) if !self.options.allow_ddl => {
+                plan_err!("DDL not supported: {}", ddl.name())
+            }
+            LogicalPlan::Dml(dml) if !self.options.allow_dml => {
+                plan_err!("DML not supported: {}", dml.op)
+            }
+            LogicalPlan::Copy(_) if !self.options.allow_dml => {
+                plan_err!("DML not supported: COPY")
+            }
+            LogicalPlan::Statement(stmt) if !self.options.allow_statements => {
+                plan_err!("Statement not supported: {}", stmt.name())
+            }
+            _ => Ok(VisitRecursion::Continue),
+        }
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -2646,43 +2836,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn unsupported_sql_returns_error() -> Result<()> {
-        let ctx = SessionContext::new();
-        ctx.register_table("test", test::table_with_sequence(1, 1).unwrap())
-            .unwrap();
-        let state = ctx.state();
-
-        // create view
-        let sql = "create view test_view as select * from test";
-        let plan = state.create_logical_plan(sql).await;
-        let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
-        assert!(physical_plan.is_err());
-        assert_eq!(
-            format!("{}", physical_plan.unwrap_err()),
-            "This feature is not implemented: Unsupported logical plan: 
CreateView"
-        );
-        // // drop view
-        let sql = "drop view test_view";
-        let plan = state.create_logical_plan(sql).await;
-        let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
-        assert!(physical_plan.is_err());
-        assert_eq!(
-            format!("{}", physical_plan.unwrap_err()),
-            "This feature is not implemented: Unsupported logical plan: 
DropView"
-        );
-        // // drop table
-        let sql = "drop table test";
-        let plan = state.create_logical_plan(sql).await;
-        let physical_plan = state.create_physical_plan(&plan.unwrap()).await;
-        assert!(physical_plan.is_err());
-        assert_eq!(
-            format!("{}", physical_plan.unwrap_err()),
-            "This feature is not implemented: Unsupported logical plan: 
DropTable"
-        );
-        Ok(())
-    }
-
     struct MyPhysicalPlanner {}
 
     #[async_trait]
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index d01d9c2390..3782feca19 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -26,7 +26,7 @@
 //! ```
 
 pub use crate::dataframe::DataFrame;
-pub use crate::execution::context::{SessionConfig, SessionContext};
+pub use crate::execution::context::{SQLOptions, SessionConfig, SessionContext};
 pub use crate::execution::options::{
     AvroReadOptions, CsvReadOptions, NdJsonReadOptions, ParquetReadOptions,
 };
diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs
index c1adcf9d0a..35423234db 100644
--- a/datafusion/core/tests/sql/mod.rs
+++ b/datafusion/core/tests/sql/mod.rs
@@ -96,6 +96,7 @@ pub mod projection;
 pub mod references;
 pub mod repartition;
 pub mod select;
+mod sql_api;
 pub mod subqueries;
 pub mod timestamp;
 pub mod udf;
diff --git a/datafusion/core/tests/sql/sql_api.rs 
b/datafusion/core/tests/sql/sql_api.rs
new file mode 100644
index 0000000000..4f249a8656
--- /dev/null
+++ b/datafusion/core/tests/sql/sql_api.rs
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::prelude::*;
+use tempfile::TempDir;
+
+#[tokio::test]
+async fn unsupported_ddl_returns_error() {
+    // Verify SessionContext::with_sql_options errors appropriately
+    let ctx = SessionContext::new();
+    ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+    // disallow ddl
+    let options = SQLOptions::new().with_allow_ddl(false);
+
+    let sql = "create view test_view as select * from test";
+    let df = ctx.sql_with_options(sql, options).await;
+    assert_eq!(
+        df.unwrap_err().to_string(),
+        "Error during planning: DDL not supported: CreateView"
+    );
+
+    // allow ddl
+    let options = options.with_allow_ddl(true);
+    ctx.sql_with_options(sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn unsupported_dml_returns_error() {
+    let ctx = SessionContext::new();
+    ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+    let options = SQLOptions::new().with_allow_dml(false);
+
+    let sql = "insert into test values (1)";
+    let df = ctx.sql_with_options(sql, options).await;
+    assert_eq!(
+        df.unwrap_err().to_string(),
+        "Error during planning: DML not supported: Insert Into"
+    );
+
+    let options = options.with_allow_dml(true);
+    ctx.sql_with_options(sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn unsupported_copy_returns_error() {
+    let tmpdir = TempDir::new().unwrap();
+    let tmpfile = tmpdir.path().join("foo.parquet");
+
+    let ctx = SessionContext::new();
+    ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+    let options = SQLOptions::new().with_allow_dml(false);
+
+    let sql = format!("copy (values(1)) to '{}'", tmpfile.to_string_lossy());
+    let df = ctx.sql_with_options(&sql, options).await;
+    assert_eq!(
+        df.unwrap_err().to_string(),
+        "Error during planning: DML not supported: COPY"
+    );
+
+    let options = options.with_allow_dml(true);
+    ctx.sql_with_options(&sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn unsupported_statement_returns_error() {
+    let ctx = SessionContext::new();
+    ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+    let options = SQLOptions::new().with_allow_statements(false);
+
+    let sql = "set datafusion.execution.batch_size = 5";
+    let df = ctx.sql_with_options(sql, options).await;
+    assert_eq!(
+        df.unwrap_err().to_string(),
+        "Error during planning: Statement not supported: SetVariable"
+    );
+
+    let options = options.with_allow_statements(true);
+    ctx.sql_with_options(sql, options).await.unwrap();
+}
+
+#[tokio::test]
+async fn ddl_can_not_be_planned_by_session_state() {
+    let ctx = SessionContext::new();
+
+    // make a table via SQL
+    ctx.sql("CREATE TABLE test (x int)").await.unwrap();
+
+    let state = ctx.state();
+
+    // can not create a logical plan for catalog DDL
+    let sql = "drop table test";
+    let plan = state.create_logical_plan(sql).await.unwrap();
+    let physical_plan = state.create_physical_plan(&plan).await;
+    assert_eq!(
+        physical_plan.unwrap_err().to_string(),
+        "This feature is not implemented: Unsupported logical plan: DropTable"
+    );
+}

Reply via email to