This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new ab3f92e SUBMARINE-417. Support Row Filtering
ab3f92e is described below
commit ab3f92eb3b31f7364836e31969692df93c9186d8
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 10 13:46:15 2020 +0800
SUBMARINE-417. Support Row Filtering
### What is this PR for?
Add row-level filtering feature to spark-security, which provider
administrators to set system-level filters to end-users.
e.g. if table `src` has a system-level filter on column `key`
(key<=20200309) for user bob
when bob execute `select * from src`, we will apply the filter to the
query, then the final result will be `select * from src where key<=20200309`
### What type of PR is it?
SUBMARINE-417
### Todos
### What is the Jira issue?
* Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-417/
### How should this be tested?
added unit tests
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Kent Yao <[email protected]>
Closes #218 from yaooqinn/SUBMARINE-417 and squashes the following commits:
af513af [Kent Yao] SUBMARINE-417. Support Row Filtering
---
.../optimizer/SubmarineRowFilterExtension.scala | 119 +++++++++++
.../optimizer/SubmarineSparkOptimizer.scala} | 30 +--
.../plans/logical/SubmarineRowFilter.scala} | 18 +-
.../SubmarineSparkPlanOmitStrategy.scala} | 29 ++-
.../spark/security/RangerSparkSQLExtension.scala | 5 +-
.../src/test/resources/ranger-spark-audit.xml | 31 +++
...rkTestUtils.scala => SubmarineSparkUtils.scala} | 10 +-
.../SubmarineRowFilterExtensionTest.scala | 49 +++++
.../optimizer/SubmarineSparkOptimizerTest.scala | 48 +++++
...uthorizerTest.scala => AuthorizationTest.scala} | 38 +---
.../spark/security/RowFilterSQLTest.scala | 217 +++++++++++++++++++++
11 files changed, 521 insertions(+), 73 deletions(-)
diff --git
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
new file mode 100644
index 0000000..14f634c
--- /dev/null
+++
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.ranger.plugin.policyengine.RangerAccessResult
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.AuthzUtils.getFieldVal
+import
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand,
CreateViewCommand, InsertIntoDataSourceDirCommand}
+import
org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand,
InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand,
InsertIntoHiveDirCommand, InsertIntoHiveTable}
+import org.apache.submarine.spark.security._
+
+/**
+ * An Apache Spark's [[Optimizer]] extension for row level filtering.
+ */
+case class SubmarineRowFilterExtension(spark: SparkSession) extends
Rule[LogicalPlan] {
+ private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
+ private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
+
+ /**
+ * Transform a Relation to a parsed [[LogicalPlan]] with specified row
filter expressions
+ * @param plan the original [[LogicalPlan]]
+ * @param table a Spark [[CatalogTable]] representation
+ * @return A new Spark [[LogicalPlan]] with specified row filter expressions
+ */
+ private def applyingRowFilterExpr(plan: LogicalPlan, table: CatalogTable):
LogicalPlan = {
+ val auditHandler = new RangerSparkAuditHandler()
+ try {
+ val identifier = table.identifier
+ val resource =
+ RangerSparkResource(SparkObjectType.TABLE, identifier.database,
identifier.table)
+ val ugi = UserGroupInformation.getCurrentUser
+ val request = new RangerSparkAccessRequest(resource,
ugi.getShortUserName,
+ ugi.getGroupNames.toSet, SparkObjectType.TABLE.toString,
SparkAccessType.SELECT,
+ sparkPlugin.getClusterName)
+ val result = sparkPlugin.evalRowFilterPolicies(request, auditHandler)
+ if (isRowFilterEnabled(result)) {
+ val condition =
spark.sessionState.sqlParser.parseExpression(result.getFilterExpr)
+ val analyzed = spark.sessionState.analyzer.execute(Filter(condition,
plan))
+ val optimized = analyzed transformAllExpressions {
+ case s: SubqueryExpression =>
+ val Subquery(newPlan) =
+
rangerSparkOptimizer.execute(Subquery(SubmarineRowFilter(s.plan)))
+ s.withNewPlan(newPlan)
+ }
+ SubmarineRowFilter(optimized)
+ } else {
+ SubmarineRowFilter(plan)
+ }
+ } catch {
+ case e: Exception => throw e
+ }
+ }
+
+ private def isRowFilterEnabled(result: RangerAccessResult): Boolean = {
+ result != null && result.isRowFilterEnabled &&
StringUtils.isNotEmpty(result.getFilterExpr)
+ }
+
+ private def doFiltering(plan: LogicalPlan): LogicalPlan = plan match {
+ case rf: SubmarineRowFilter => rf
+ case fixed if fixed.find(_.isInstanceOf[SubmarineRowFilter]).nonEmpty =>
fixed
+ case _ =>
+ val plansWithTables = plan.collectLeaves().map {
+ case h if h.nodeName == "HiveTableRelation" =>
+ (h, getFieldVal(h, "tableMeta").asInstanceOf[CatalogTable])
+ case m if m.nodeName == "MetastoreRelation" =>
+ (m, getFieldVal(m, "catalogTable").asInstanceOf[CatalogTable])
+ case l: LogicalRelation if l.catalogTable.isDefined =>
+ (l, l.catalogTable.get)
+ case _ => null
+ }.filter(_ != null).map(lt => (lt._1, applyingRowFilterExpr(lt._1,
lt._2))).toMap
+
+ plan transformUp {
+ case p => plansWithTables.getOrElse(p, p)
+ }
+ }
+
+ /**
+ * Transform a spark logical plan to another plan with the row filer
expressions
+ * @param plan the original [[LogicalPlan]]
+ * @return the logical plan with row filer expressions applied
+ */
+ override def apply(plan: LogicalPlan): LogicalPlan = plan match {
+ case c: Command => c match {
+ case c: CreateDataSourceTableAsSelectCommand => c.copy(query =
doFiltering(c.query))
+ case c: CreateHiveTableAsSelectCommand => c.copy(query =
doFiltering(c.query))
+ case c: CreateViewCommand => c.copy(child = doFiltering(c.child))
+ case i: InsertIntoDataSourceCommand => i.copy(query =
doFiltering(i.query))
+ case i: InsertIntoDataSourceDirCommand => i.copy(query =
doFiltering(i.query))
+ case i: InsertIntoHadoopFsRelationCommand => i.copy(query =
doFiltering(i.query))
+ case i: InsertIntoHiveDirCommand => i.copy(query = doFiltering(i.query))
+ case i: InsertIntoHiveTable => i.copy(query = doFiltering(i.query))
+ case s: SaveIntoDataSourceCommand => s.copy(query = doFiltering(s.query))
+ case cmd => cmd
+ }
+ case other => doFiltering(other)
+ }
+}
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizer.scala
similarity index 54%
copy from
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
copy to
submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizer.scala
index 121afe3..d232db6 100644
---
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
+++
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizer.scala
@@ -15,23 +15,23 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst.optimizer
-import java.security.PrivilegedExceptionAction
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.SparkSession
-import org.apache.hadoop.security.UserGroupInformation
-import
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
-
-object RangerSparkTestUtils {
-
- def withUser[T](user: String)(f: => T): T = {
- val ugi = UserGroupInformation.createRemoteUser(user)
- ugi.doAs(new PrivilegedExceptionAction[T] {
- override def run(): T = f
- })
- }
+/**
+ * An Optimizer without all `spark.sql.extensions`
+ */
+class SubmarineSparkOptimizer(spark: SparkSession) extends
RuleExecutor[LogicalPlan] {
- def enableAuthorizer(spark: SparkSession): Unit = {
-
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
+ override def batches: Seq[Batch] = {
+ val optimizer = spark.sessionState.optimizer
+ val extRules = optimizer.extendedOperatorOptimizationRules
+ optimizer.batches.map { batch =>
+ val ruleSet = batch.rules.toSet -- extRules
+ Batch(batch.name, FixedPoint(batch.strategy.maxIterations),
ruleSet.toSeq: _*)
+ }
}
}
diff --git
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineRowFilter.scala
similarity index 64%
copy from
submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
copy to
submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineRowFilter.scala
index daf4d03..f61b323 100644
---
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
+++
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineRowFilter.scala
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,15 @@
* limitations under the License.
*/
-package org.apache.submarine.spark.security
+package org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.SparkSessionExtensions
-import
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
+import org.apache.spark.sql.catalyst.expressions.Attribute
-class RangerSparkSQLExtension extends Extensions {
- override def apply(ext: SparkSessionExtensions): Unit = {
- ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
- }
+/**
+ * A wrapper for a transformed plan with row level filter applied, which will
be removed during
+ * LogicalPlan -> PhysicalPlan
+ *
+ */
+case class SubmarineRowFilter(child: LogicalPlan) extends UnaryNode {
+ override def output: Seq[Attribute] = child.output
}
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
similarity index 57%
copy from
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
copy to
submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
index 121afe3..ae9aad8 100644
---
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
+++
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-package org.apache.spark.sql
+package org.apache.spark.sql.execution
-import java.security.PrivilegedExceptionAction
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
SubmarineRowFilter}
-import org.apache.hadoop.security.UserGroupInformation
-import
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
-
-object RangerSparkTestUtils {
-
- def withUser[T](user: String)(f: => T): T = {
- val ugi = UserGroupInformation.createRemoteUser(user)
- ugi.doAs(new PrivilegedExceptionAction[T] {
- override def run(): T = f
- })
- }
-
- def enableAuthorizer(spark: SparkSession): Unit = {
-
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
+/**
+ * An Apache Spark's [[Strategy]] extension for omitting marker for row level
filtering and data
+ * masking.
+ */
+case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends
Strategy {
+ override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+ case SubmarineRowFilter(child) => planLater(child) :: Nil
+ case _ => Nil
}
-}
+}
\ No newline at end of file
diff --git
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
index daf4d03..80dc127 100644
---
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
+++
b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
@@ -18,10 +18,13 @@
package org.apache.submarine.spark.security
import org.apache.spark.sql.SparkSessionExtensions
-import
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension,
SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
class RangerSparkSQLExtension extends Extensions {
override def apply(ext: SparkSessionExtensions): Unit = {
ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
+ ext.injectOptimizerRule(SubmarineRowFilterExtension)
+ ext.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
}
}
diff --git
a/submarine-security/spark-security/src/test/resources/ranger-spark-audit.xml
b/submarine-security/spark-security/src/test/resources/ranger-spark-audit.xml
new file mode 100644
index 0000000..32721d3
--- /dev/null
+++
b/submarine-security/spark-security/src/test/resources/ranger-spark-audit.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
+
+ <property>
+ <name>xasecure.audit.is.enabled</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <name>xasecure.audit.destination.db</name>
+ <value>false</value>
+ </property>
+
+</configuration>
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
similarity index 75%
rename from
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
rename to
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
index 121afe3..1472413 100644
---
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
+++
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.security.UserGroupInformation
-import
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension,
SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
-object RangerSparkTestUtils {
+object SubmarineSparkUtils {
def withUser[T](user: String)(f: => T): T = {
val ugi = UserGroupInformation.createRemoteUser(user)
@@ -34,4 +35,9 @@ object RangerSparkTestUtils {
def enableAuthorizer(spark: SparkSession): Unit = {
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
}
+
+ def enableRowFilter(spark: SparkSession): Unit = {
+ spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension)
+ spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
+ }
}
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtensionTest.scala
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtensionTest.scala
new file mode 100644
index 0000000..805ec3c
--- /dev/null
+++
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtensionTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, SubmarineRowFilter}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.SubmarineSparkUtils
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarineRowFilterExtensionTest extends FunSuite with BeforeAndAfterAll {
+
+ private val spark = TestHive.sparkSession.newSession()
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ spark.reset()
+ }
+
+ test("applying condition to orginal query if row filter exists in ranger") {
+ val extension = SubmarineRowFilterExtension(spark)
+ val frame = spark.sql("select * from src")
+ SubmarineSparkUtils.withUser("bob") {
+ val plan = extension.apply(frame.queryExecution.optimizedPlan)
+ assert(plan.collect { case f: Filter => f }.nonEmpty)
+ assert(plan.isInstanceOf[SubmarineRowFilter])
+ }
+
+ SubmarineSparkUtils.withUser("alice") {
+ val plan = extension.apply(frame.queryExecution.optimizedPlan)
+ assert(plan.collect { case f: Filter => f }.isEmpty)
+ assert(plan.isInstanceOf[SubmarineRowFilter])
+ }
+ }
+}
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizerTest.scala
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizerTest.scala
new file mode 100644
index 0000000..e3e09b4
--- /dev/null
+++
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizerTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.SubmarineSparkUtils
+import org.apache.spark.sql.execution.SubmarineShowTablesCommand
+import org.apache.spark.sql.execution.command.ShowTablesCommand
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarineSparkOptimizerTest extends FunSuite with BeforeAndAfterAll {
+
+ private val spark = TestHive.sparkSession.newSession()
+
+ private val optimizer = new SubmarineSparkOptimizer(spark)
+
+ private val originPlan = spark.sql("show
tables").queryExecution.optimizedPlan
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ spark.reset()
+ }
+
+ test("SubmarineSparkOptimizer will not do authorization") {
+ SubmarineSparkUtils.enableAuthorizer(spark)
+ val df = spark.sql("show tables")
+ val plan = df.queryExecution.optimizedPlan
+ assert(plan.isInstanceOf[SubmarineShowTablesCommand])
+ val unchanged = optimizer.execute(originPlan)
+ assert(unchanged.isInstanceOf[ShowTablesCommand])
+ }
+
+}
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/SparkRangerAuthorizerTest.scala
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
similarity index 84%
rename from
submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/SparkRangerAuthorizerTest.scala
rename to
submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
index e1373d1..77c7b3e 100644
---
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/SparkRangerAuthorizerTest.scala
+++
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
@@ -18,47 +18,25 @@
package org.apache.submarine.spark.security
import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.internal.SQLConf
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-class SparkRangerAuthorizerTest extends FunSuite with BeforeAndAfterAll {
+class AuthorizationTest extends FunSuite with BeforeAndAfterAll {
- import org.apache.spark.sql.RangerSparkTestUtils._
- private val spark = TestHive.sparkSession
+ import org.apache.spark.sql.SubmarineSparkUtils._
+ private val spark = TestHive.sparkSession.newSession()
private lazy val sql = spark.sql _
override def beforeAll(): Unit = {
super.beforeAll()
- spark.conf.set(SQLConf.CROSS_JOINS_ENABLED.key, "true")
sql(
"""
- |CREATE TABLE default.rangertbl1 AS SELECT * FROM default.src
+ |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM
default.src
""".stripMargin)
sql(
"""
- |CREATE TABLE default.rangertbl2 AS SELECT * FROM default.src
- """.stripMargin)
-
- sql(
- """
- |CREATE TABLE default.rangertbl3 AS SELECT * FROM default.src
- """.stripMargin)
-
- sql(
- """
- |CREATE TABLE default.rangertbl4 AS SELECT * FROM default.src
- """.stripMargin)
-
- sql(
- """
- |CREATE TABLE default.rangertbl5 AS SELECT * FROM default.src
- """.stripMargin)
-
- sql(
- """
- |CREATE TABLE default.rangertbl6 AS SELECT * FROM default.src
+ |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM
default.src
""".stripMargin)
sql(
@@ -77,10 +55,10 @@ class SparkRangerAuthorizerTest extends FunSuite with
BeforeAndAfterAll {
}
withUser("alice") {
- assert(sql("show tables").count() === 7)
+ assert(sql("show tables").count() === 3)
}
withUser("bob") {
- assert(sql("show tables").count() === 7)
+ assert(sql("show tables").count() === 3)
}
enableAuthorizer(spark)
@@ -109,7 +87,7 @@ class SparkRangerAuthorizerTest extends FunSuite with
BeforeAndAfterAll {
assert(sql("show tables").count() === 0)
}
withUser("bob") {
- assert(sql("show tables").count() === 7)
+ assert(sql("show tables").count() === 3)
}
}
diff --git
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RowFilterSQLTest.scala
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RowFilterSQLTest.scala
new file mode 100644
index 0000000..d0024bb
--- /dev/null
+++
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RowFilterSQLTest.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.spark.security
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.SubmarineSparkUtils._
+import org.apache.spark.sql.catalyst.plans.logical.{Project,
SubmarineRowFilter}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class RowFilterSQLTest extends FunSuite with BeforeAndAfterAll {
+
+ private val spark = TestHive.sparkSession.newSession()
+ private lazy val sql = spark.sql _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM
default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM
default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl3 AS SELECT * FROM
default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl4 AS SELECT * FROM
default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl5 AS SELECT * FROM
default.src
+ """.stripMargin)
+
+ sql(
+ """
+ |CREATE TABLE IF NOT EXISTS default.rangertbl6 AS SELECT * FROM
default.src
+ """.stripMargin)
+ enableRowFilter(spark)
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ spark.reset()
+ }
+
+
+ test("simple query") {
+ val statement = "select * from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+
assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineRowFilter]).nonEmpty)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) < 20, "keys above 20 should be filtered
automatically")
+ assert(df.count() === 20, "keys above 20 should be filtered
automatically")
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ assert(df.count() === 500)
+ }
+ }
+
+ test("projection with ranger filter key") {
+ val statement = "select key from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) < 20)
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ assert(df.count() === 500)
+ }
+ }
+
+ test("projection without ranger filter key") {
+ val statement = "select value from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getString(0).split("_")(1).toInt < 20)
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ assert(df.count() === 500)
+ }
+ }
+
+ test("filter with with ranger filter key") {
+ val statement = "select key from default.src where key = 0"
+ val statement2 = "select key from default.src where key >= 20"
+ withUser("bob") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) === 0)
+ val df2 = sql(statement2)
+ assert(df2.count() === 0, "all keys should be filtered")
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ assert(df.count() === 3)
+ val df2 = sql(statement2)
+ assert(df2.count() === 480)
+ }
+ }
+
+ test("WITH alias") {
+ val statement = "select key as k1, value v1 from default.src"
+ withUser("bob") {
+ val df = sql(statement)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) < 20, "keys above 20 should be filtered
automatically")
+ assert(df.count() === 20, "keys above 20 should be filtered
automatically")
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ assert(df.count() === 500)
+ }
+ }
+
+ test("aggregate") {
+ val statement = "select sum(key) as k1, value v1 from default.src group by
v1"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getString(1).split("_")(1).toInt < 20)
+ }
+ withUser("alice") {
+ val df = sql(statement)
+ assert(df.count() === 309)
+ }
+ }
+
+ test("with equal expression") {
+ val statement = "select * from default.rangertbl1"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) === 0, "rangertbl1 has an internal expression
key=0")
+ }
+ }
+
+ test("with in set") {
+ val statement = "select * from default.rangertbl2"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val row = df.take(1)(0)
+ assert(row.getInt(0) === 0, "rangertbl2 has an internal expression key
in (0, 1, 2)")
+ }
+ }
+
+ test("with in subquery") {
+ val statement = "select * from default.rangertbl3"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val rows = df.collect()
+ assert(rows.forall(_.getInt(0) < 100), "rangertbl3 has an internal
expression key in (query)")
+ }
+ }
+
+ test("with in subquery self joined") {
+ val statement = "select * from default.rangertbl4"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val rows = df.collect()
+ assert(rows.length === 500)
+ }
+ }
+
+ test("with udf") {
+ val statement = "select * from default.rangertbl5"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val rows = df.collect()
+ assert(rows.length === 0)
+ }
+ }
+
+ test("with multiple expressions") {
+ val statement = "select * from default.rangertbl6"
+ withUser("bob") {
+ val df = sql(statement)
+ println(df.queryExecution.optimizedPlan)
+ val rows = df.collect()
+ assert(rows.forall { r => val x = r.getInt(0); x > 1 && x < 10 || x ==
500 })
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]