This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 669e0f5  SUBMARINE-638. [Spark-Security]Apply global/local limit after 
masking projection
669e0f5 is described below

commit 669e0f5e562395995c5fa3162dfaf46d003e8a73
Author: sriharsha.tenneti <[email protected]>
AuthorDate: Thu Jan 14 16:27:34 2021 +0800

    SUBMARINE-638. [Spark-Security]Apply global/local limit after masking 
projection
    
    ### What is this PR for?
    This PR optimises the limit query when masking policies are applied in the 
spark-security plugin. This is applying the global limit on top masking 
projection if there is a limit in the initial plan
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-638
    
    ### How should this be tested?
    https://travis-ci.org/github/harsha-tenneti/submarine/builds/754430620
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: sriharsha.tenneti <[email protected]>
    
    Closes #490 from harsha-tenneti/SUBMARINE-638 and squashes the following 
commits:
    
    c2997f6 [sriharsha.tenneti] Apply global/local limit after masking 
projection
---
 .../catalyst/optimizer/SubmarineDataMaskingExtension.scala  | 13 +++++++++++--
 .../submarine/spark/security/DataMaskingSQLTest.scala       | 12 +++++++++++-
 2 files changed, 22 insertions(+), 3 deletions(-)

diff --git 
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 84c8733..7941930 100644
--- 
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable, 
HiveTableRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
AttributeReference, Expression, ExprId, NamedExpression, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import 
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, 
CreateViewCommand, InsertIntoDataSourceDirCommand}
@@ -233,7 +233,16 @@ case class SubmarineDataMaskingExtension(spark: 
SparkSession) extends Rule[Logic
         case p if hasCatalogTable(p) => SubmarineDataMasking(p)
       }
 
-      marked transformAllExpressions {
+      // Extract global/local limit if any and apply after masking projection
+      val limitExpr: Option[Expression] = plan match {
+        case globalLimit: GlobalLimit => Some(globalLimit.limitExpr)
+        case localLimit: LocalLimit => Some(localLimit.limitExpr)
+        case _ => None
+      }
+
+      val markedWithLimit = if (limitExpr.isDefined) Limit(limitExpr.get, 
marked) else marked
+
+      markedWithLimit transformAllExpressions {
         case s: SubqueryExpression =>
           val SubqueryCompatible(newPlan, _) = SubqueryCompatible(
               SubmarineDataMasking(s.plan), 
SubqueryExpression.hasCorrelatedSubquery(s))
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
index b33bd5a..638b62f 100644
--- 
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
@@ -21,7 +21,7 @@ package org.apache.submarine.spark.security
 
 import org.apache.commons.codec.digest.DigestUtils
 import org.apache.spark.sql.SubmarineSparkUtils.{enableDataMasking, withUser}
-import org.apache.spark.sql.catalyst.plans.logical.{Project, 
SubmarineDataMasking}
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Project, 
SubmarineDataMasking}
 import org.apache.spark.sql.hive.test.TestHive
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
@@ -282,4 +282,14 @@ case class DataMaskingSQLTest() extends FunSuite with 
BeforeAndAfterAll {
       assert(row.getString(1) === "xxx_277", "value shows last 4 characters")
     }
   }
+
+  test("query limit expression") {
+    withUser("bob") {
+      val df = sql("select * from default.src limit 10")
+      
assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+      assert(df.queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
+      val row = df.take(1)(0)
+      assert(row.getString(1).startsWith("x"), "values should be masked")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to