This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new dce77db316 Add standalone `AnalyzerRule` example that implements row 
level access control (#11089)
dce77db316 is described below

commit dce77db316beb5bf5dc2326c350b2d642edbfed8
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jul 5 08:45:39 2024 -0400

    Add standalone `AnalyzerRule` example that implements row level access 
control (#11089)
    
    * Add standlone example AnalyzerRule
    
    * Apply suggestions from code review
    
    Co-authored-by: Jax Liu <[email protected]>
    
    * update for api change
    
    * Apply suggestions from code review
    
    Co-authored-by: Jonah Gao <[email protected]>
    
    ---------
    
    Co-authored-by: Jax Liu <[email protected]>
    Co-authored-by: Jonah Gao <[email protected]>
---
 datafusion-examples/README.md                 |   1 +
 datafusion-examples/examples/analyzer_rule.rs | 200 ++++++++++++++++++++++++++
 2 files changed, 201 insertions(+)

diff --git a/datafusion-examples/README.md b/datafusion-examples/README.md
index dc92019035..f868a5310c 100644
--- a/datafusion-examples/README.md
+++ b/datafusion-examples/README.md
@@ -47,6 +47,7 @@ cargo run --example dataframe
 - [`advanced_udf.rs`](examples/advanced_udf.rs): Define and invoke a more 
complicated User Defined Scalar Function (UDF)
 - [`advanced_udwf.rs`](examples/advanced_udwf.rs): Define and invoke a more 
complicated User Defined Window Function (UDWF)
 - [`advanced_parquet_index.rs`](examples/advanced_parquet_index.rs): Creates a 
detailed secondary index that covers the contents of several parquet files
+- [`analyzer_rule.rs`](examples/analyzer_rule.rs): Use a custom AnalyzerRule 
to change a query's semantics (row level access control)
 - [`catalog.rs`](examples/catalog.rs): Register the table into a custom catalog
 - [`composed_extension_codec`](examples/composed_extension_codec.rs): Example 
of using multiple extension codecs for serialization / deserialization
 - [`csv_sql_streaming.rs`](examples/csv_sql_streaming.rs): Build and run a 
streaming query plan from a SQL statement against a local CSV file
diff --git a/datafusion-examples/examples/analyzer_rule.rs 
b/datafusion-examples/examples/analyzer_rule.rs
new file mode 100644
index 0000000000..bd067be97b
--- /dev/null
+++ b/datafusion-examples/examples/analyzer_rule.rs
@@ -0,0 +1,200 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray};
+use datafusion::prelude::SessionContext;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TreeNode};
+use datafusion_common::Result;
+use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
+use datafusion_optimizer::analyzer::AnalyzerRule;
+use std::sync::{Arc, Mutex};
+
+/// This example demonstrates how to add your own [`AnalyzerRule`] to
+/// DataFusion.
+///
+/// [`AnalyzerRule`]s transform [`LogicalPlan`]s prior to the DataFusion
+/// optimization process, and can be used to change the plan's semantics (e.g.
+/// output types).
+///
+/// This example shows an `AnalyzerRule` which implements a simplistic of row
+/// level access control scheme by introducing a filter to the query.
+///
+/// See [optimizer_rule.rs] for an example of a optimizer rule
+#[tokio::main]
+pub async fn main() -> Result<()> {
+    // AnalyzerRules run before OptimizerRules.
+    //
+    // DataFusion includes several built in AnalyzerRules for tasks such as 
type
+    // coercion which change the types of expressions in the plan. Add our new
+    // rule to the context to run it during the analysis phase.
+    let rule = Arc::new(RowLevelAccessControl::new());
+    let ctx = SessionContext::new();
+    ctx.add_analyzer_rule(Arc::clone(&rule) as _);
+
+    ctx.register_batch("employee", employee_batch())?;
+
+    // Now, planning any SQL statement also invokes the AnalyzerRule
+    let plan = ctx
+        .sql("SELECT * FROM employee")
+        .await?
+        .into_optimized_plan()?;
+
+    // Printing the query plan shows a filter has been added
+    //
+    // Filter: employee.position = Utf8("Engineer")
+    //   TableScan: employee projection=[name, age, position]
+    println!("Logical Plan:\n\n{}\n", plan.display_indent());
+
+    // Execute the query, and indeed no Manager's are returned
+    //
+    // +-----------+-----+----------+
+    // | name      | age | position |
+    // +-----------+-----+----------+
+    // | Andy      | 11  | Engineer |
+    // | Oleks     | 33  | Engineer |
+    // | Xiangpeng | 55  | Engineer |
+    // +-----------+-----+----------+
+    ctx.sql("SELECT * FROM employee").await?.show().await?;
+
+    // We can now change the access level to "Manager" and see the results
+    //
+    // +----------+-----+----------+
+    // | name     | age | position |
+    // +----------+-----+----------+
+    // | Andrew   | 22  | Manager  |
+    // | Chunchun | 44  | Manager  |
+    // +----------+-----+----------+
+    rule.set_show_position("Manager");
+    ctx.sql("SELECT * FROM employee").await?.show().await?;
+
+    // The filters introduced by our AnalyzerRule are treated the same as any
+    // other filter by the DataFusion optimizer, including predicate push down
+    // (including into scans), simplifications, and similar optimizations.
+    //
+    // For example adding another predicate to the query
+    let plan = ctx
+        .sql("SELECT * FROM employee WHERE age > 30")
+        .await?
+        .into_optimized_plan()?;
+
+    // We can see the DataFusion Optimizer has combined the filters together
+    // when we print out the plan
+    //
+    // Filter: employee.age > Int32(30) AND employee.position = Utf8("Manager")
+    //   TableScan: employee projection=[name, age, position]
+    println!("Logical Plan:\n\n{}\n", plan.display_indent());
+
+    Ok(())
+}
+
+/// Example AnalyzerRule that implements a very basic "row level access
+/// control"
+///
+/// In this case, it adds a filter to the plan that removes all managers from
+/// the result set.
+#[derive(Debug)]
+struct RowLevelAccessControl {
+    /// Models the current access level of the session
+    ///
+    /// This is value of the position column which should be included in the
+    /// result set. It is wrapped in a `Mutex` so we can change it during query
+    show_position: Mutex<String>,
+}
+
+impl RowLevelAccessControl {
+    fn new() -> Self {
+        Self {
+            show_position: Mutex::new("Engineer".to_string()),
+        }
+    }
+
+    /// return the current position to show, as an expression
+    fn show_position(&self) -> Expr {
+        lit(self.show_position.lock().unwrap().clone())
+    }
+
+    /// specifies a different position to show in the result set
+    fn set_show_position(&self, access_level: impl Into<String>) {
+        *self.show_position.lock().unwrap() = access_level.into();
+    }
+}
+
+impl AnalyzerRule for RowLevelAccessControl {
+    fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> 
Result<LogicalPlan> {
+        // use the TreeNode API to recursively walk the LogicalPlan tree
+        // and all of its children (inputs)
+        let transfomed_plan = plan.transform(|plan| {
+            // This closure is called for each LogicalPlan node
+            // if it is a Scan node, add a filter to remove all managers
+            if is_employee_table_scan(&plan) {
+                // Use the LogicalPlanBuilder to add a filter to the plan
+                let filter = LogicalPlanBuilder::from(plan)
+                    // Filter Expression: position = <access level>
+                    .filter(col("position").eq(self.show_position()))?
+                    .build()?;
+
+                // `Transformed::yes` signals the plan was changed
+                Ok(Transformed::yes(filter))
+            } else {
+                // `Transformed::no`
+                // signals the plan was not changed
+                Ok(Transformed::no(plan))
+            }
+        })?;
+
+        // the result of calling transform is a `Transformed` structure which
+        // contains
+        //
+        // 1. a flag signaling if any rewrite took place
+        // 2. a flag if the recursion stopped early
+        // 3. The actual transformed data (a LogicalPlan in this case)
+        //
+        // This example does not need the value of either flag, so simply
+        // extract the LogicalPlan "data"
+        Ok(transfomed_plan.data)
+    }
+
+    fn name(&self) -> &str {
+        "table_access"
+    }
+}
+
+fn is_employee_table_scan(plan: &LogicalPlan) -> bool {
+    if let LogicalPlan::TableScan(scan) = plan {
+        scan.table_name.table() == "employee"
+    } else {
+        false
+    }
+}
+
+/// Return a RecordBatch with made up data about fictional employees
+fn employee_batch() -> RecordBatch {
+    let name: ArrayRef = Arc::new(StringArray::from_iter_values([
+        "Andy",
+        "Andrew",
+        "Oleks",
+        "Chunchun",
+        "Xiangpeng",
+    ]));
+    let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22, 33, 44, 55]));
+    let position = Arc::new(StringArray::from_iter_values([
+        "Engineer", "Manager", "Engineer", "Manager", "Engineer",
+    ]));
+    RecordBatch::try_from_iter(vec![("name", name), ("age", age), ("position", 
position)])
+        .unwrap()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to