This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 669e0f5 SUBMARINE-638. [Spark-Security]Apply global/local limit after
masking projection
669e0f5 is described below
commit 669e0f5e562395995c5fa3162dfaf46d003e8a73
Author: sriharsha.tenneti <[email protected]>
AuthorDate: Thu Jan 14 16:27:34 2021 +0800
SUBMARINE-638. [Spark-Security]Apply global/local limit after masking
projection
### What is this PR for?
This PR optimises the limit query when masking policies are applied in the
spark-security plugin. This is applying the global limit on top masking
projection if there is a limit in the initial plan
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-638
### How should this be tested?
https://travis-ci.org/github/harsha-tenneti/submarine/builds/754430620
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: sriharsha.tenneti <[email protected]>
Closes #490 from harsha-tenneti/SUBMARINE-638 and squashes the following
commits:
c2997f6 [sriharsha.tenneti] Apply global/local limit after masking
projection
---
.../catalyst/optimizer/SubmarineDataMaskingExtension.scala | 13 +++++++++++--
.../submarine/spark/security/DataMaskingSQLTest.scala | 12 +++++++++++-
2 files changed, 22 insertions(+), 3 deletions(-)
diff --git
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
index 84c8733..7941930 100644
---
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
+++
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineDataMaskingExtension.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTable,
HiveTableRelation}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, ExprId, NamedExpression, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
AttributeReference, Expression, ExprId, NamedExpression, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.Rule
import
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand,
CreateViewCommand, InsertIntoDataSourceDirCommand}
@@ -233,7 +233,16 @@ case class SubmarineDataMaskingExtension(spark:
SparkSession) extends Rule[Logic
case p if hasCatalogTable(p) => SubmarineDataMasking(p)
}
- marked transformAllExpressions {
+ // Extract global/local limit if any and apply after masking projection
+ val limitExpr: Option[Expression] = plan match {
+ case globalLimit: GlobalLimit => Some(globalLimit.limitExpr)
+ case localLimit: LocalLimit => Some(localLimit.limitExpr)
+ case _ => None
+ }
+
+ val markedWithLimit = if (limitExpr.isDefined) Limit(limitExpr.get,
marked) else marked
+
+ markedWithLimit transformAllExpressions {
case s: SubqueryExpression =>
val SubqueryCompatible(newPlan, _) = SubqueryCompatible(
SubmarineDataMasking(s.plan),
SubqueryExpression.hasCorrelatedSubquery(s))
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
index b33bd5a..638b62f 100644
---
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
+++
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/DataMaskingSQLTest.scala
@@ -21,7 +21,7 @@ package org.apache.submarine.spark.security
import org.apache.commons.codec.digest.DigestUtils
import org.apache.spark.sql.SubmarineSparkUtils.{enableDataMasking, withUser}
-import org.apache.spark.sql.catalyst.plans.logical.{Project,
SubmarineDataMasking}
+import org.apache.spark.sql.catalyst.plans.logical.{GlobalLimit, Project,
SubmarineDataMasking}
import org.apache.spark.sql.hive.test.TestHive
import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -282,4 +282,14 @@ case class DataMaskingSQLTest() extends FunSuite with
BeforeAndAfterAll {
assert(row.getString(1) === "xxx_277", "value shows last 4 characters")
}
}
+
+ test("query limit expression") {
+ withUser("bob") {
+ val df = sql("select * from default.src limit 10")
+
assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineDataMasking]).nonEmpty)
+ assert(df.queryExecution.optimizedPlan.isInstanceOf[GlobalLimit])
+ val row = df.take(1)(0)
+ assert(row.getString(1).startsWith("x"), "values should be masked")
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]