This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new ab3f92e  SUBMARINE-417. Support Row Filtering
ab3f92e is described below

commit ab3f92eb3b31f7364836e31969692df93c9186d8
Author: Kent Yao <[email protected]>
AuthorDate: Tue Mar 10 13:46:15 2020 +0800

    SUBMARINE-417. Support Row Filtering
    
    ### What is this PR for?
    Add row-level filtering feature to spark-security, which provider 
administrators to set system-level filters to end-users.
    
    e.g. if table `src` has a system-level filter on column `key` 
(key<=20200309) for user bob
    
    when bob execute `select * from src`, we will apply the filter to the 
query,  then the final result will be `select * from src where key<=20200309`
    
    ### What type of PR is it?
    
    SUBMARINE-417
    
    ### Todos
    
    ### What is the Jira issue?
    * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-417/
    
    ### How should this be tested?
    
    added unit tests
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Kent Yao <[email protected]>
    
    Closes #218 from yaooqinn/SUBMARINE-417 and squashes the following commits:
    
    af513af [Kent Yao] SUBMARINE-417. Support Row Filtering
---
 .../optimizer/SubmarineRowFilterExtension.scala    | 119 +++++++++++
 .../optimizer/SubmarineSparkOptimizer.scala}       |  30 +--
 .../plans/logical/SubmarineRowFilter.scala}        |  18 +-
 .../SubmarineSparkPlanOmitStrategy.scala}          |  29 ++-
 .../spark/security/RangerSparkSQLExtension.scala   |   5 +-
 .../src/test/resources/ranger-spark-audit.xml      |  31 +++
 ...rkTestUtils.scala => SubmarineSparkUtils.scala} |  10 +-
 .../SubmarineRowFilterExtensionTest.scala          |  49 +++++
 .../optimizer/SubmarineSparkOptimizerTest.scala    |  48 +++++
 ...uthorizerTest.scala => AuthorizationTest.scala} |  38 +---
 .../spark/security/RowFilterSQLTest.scala          | 217 +++++++++++++++++++++
 11 files changed, 521 insertions(+), 73 deletions(-)

diff --git 
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
new file mode 100644
index 0000000..14f634c
--- /dev/null
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtension.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.commons.lang.StringUtils
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.ranger.plugin.policyengine.RangerAccessResult
+import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.AuthzUtils.getFieldVal
+import 
org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, 
CreateViewCommand, InsertIntoDataSourceDirCommand}
+import 
org.apache.spark.sql.execution.datasources.{InsertIntoDataSourceCommand, 
InsertIntoHadoopFsRelationCommand, LogicalRelation, SaveIntoDataSourceCommand}
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, 
InsertIntoHiveDirCommand, InsertIntoHiveTable}
+import org.apache.submarine.spark.security._
+
+/**
+ * An Apache Spark's [[Optimizer]] extension for row level filtering.
+ */
+case class SubmarineRowFilterExtension(spark: SparkSession) extends 
Rule[LogicalPlan] {
+  private lazy val sparkPlugin = RangerSparkPlugin.build().getOrCreate()
+  private lazy val rangerSparkOptimizer = new SubmarineSparkOptimizer(spark)
+
+  /**
+   * Transform a Relation to a parsed [[LogicalPlan]] with specified row 
filter expressions
+   * @param plan the original [[LogicalPlan]]
+   * @param table a Spark [[CatalogTable]] representation
+   * @return A new Spark [[LogicalPlan]] with specified row filter expressions
+   */
+  private def applyingRowFilterExpr(plan: LogicalPlan, table: CatalogTable): 
LogicalPlan = {
+    val auditHandler = new RangerSparkAuditHandler()
+    try {
+      val identifier = table.identifier
+      val resource =
+        RangerSparkResource(SparkObjectType.TABLE, identifier.database, 
identifier.table)
+      val ugi = UserGroupInformation.getCurrentUser
+      val request = new RangerSparkAccessRequest(resource, 
ugi.getShortUserName,
+        ugi.getGroupNames.toSet, SparkObjectType.TABLE.toString, 
SparkAccessType.SELECT,
+        sparkPlugin.getClusterName)
+      val result = sparkPlugin.evalRowFilterPolicies(request, auditHandler)
+      if (isRowFilterEnabled(result)) {
+        val condition = 
spark.sessionState.sqlParser.parseExpression(result.getFilterExpr)
+        val analyzed = spark.sessionState.analyzer.execute(Filter(condition, 
plan))
+        val optimized = analyzed transformAllExpressions {
+          case s: SubqueryExpression =>
+            val Subquery(newPlan) =
+              
rangerSparkOptimizer.execute(Subquery(SubmarineRowFilter(s.plan)))
+            s.withNewPlan(newPlan)
+        }
+        SubmarineRowFilter(optimized)
+      } else {
+        SubmarineRowFilter(plan)
+      }
+    } catch {
+      case e: Exception => throw e
+    }
+  }
+
+  private def isRowFilterEnabled(result: RangerAccessResult): Boolean = {
+    result != null && result.isRowFilterEnabled && 
StringUtils.isNotEmpty(result.getFilterExpr)
+  }
+
+  private def doFiltering(plan: LogicalPlan): LogicalPlan = plan match {
+    case rf: SubmarineRowFilter => rf
+    case fixed if fixed.find(_.isInstanceOf[SubmarineRowFilter]).nonEmpty => 
fixed
+    case _ =>
+      val plansWithTables = plan.collectLeaves().map {
+        case h if h.nodeName == "HiveTableRelation" =>
+          (h, getFieldVal(h, "tableMeta").asInstanceOf[CatalogTable])
+        case m if m.nodeName == "MetastoreRelation" =>
+          (m, getFieldVal(m, "catalogTable").asInstanceOf[CatalogTable])
+        case l: LogicalRelation if l.catalogTable.isDefined =>
+          (l, l.catalogTable.get)
+        case _ => null
+      }.filter(_ != null).map(lt => (lt._1, applyingRowFilterExpr(lt._1, 
lt._2))).toMap
+
+      plan transformUp {
+        case p => plansWithTables.getOrElse(p, p)
+      }
+  }
+
+  /**
+   * Transform a spark logical plan to another plan with the row filer 
expressions
+   * @param plan the original [[LogicalPlan]]
+   * @return the logical plan with row filer expressions applied
+   */
+  override def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case c: Command => c match {
+      case c: CreateDataSourceTableAsSelectCommand => c.copy(query = 
doFiltering(c.query))
+      case c: CreateHiveTableAsSelectCommand => c.copy(query = 
doFiltering(c.query))
+      case c: CreateViewCommand => c.copy(child = doFiltering(c.child))
+      case i: InsertIntoDataSourceCommand => i.copy(query = 
doFiltering(i.query))
+      case i: InsertIntoDataSourceDirCommand => i.copy(query = 
doFiltering(i.query))
+      case i: InsertIntoHadoopFsRelationCommand => i.copy(query = 
doFiltering(i.query))
+      case i: InsertIntoHiveDirCommand => i.copy(query = doFiltering(i.query))
+      case i: InsertIntoHiveTable => i.copy(query = doFiltering(i.query))
+      case s: SaveIntoDataSourceCommand => s.copy(query = doFiltering(s.query))
+      case cmd => cmd
+    }
+    case other => doFiltering(other)
+  }
+}
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizer.scala
similarity index 54%
copy from 
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
copy to 
submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizer.scala
index 121afe3..d232db6 100644
--- 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizer.scala
@@ -15,23 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
+package org.apache.spark.sql.catalyst.optimizer
 
-import java.security.PrivilegedExceptionAction
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.SparkSession
 
-import org.apache.hadoop.security.UserGroupInformation
-import 
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
-
-object RangerSparkTestUtils {
-
-  def withUser[T](user: String)(f: => T): T = {
-    val ugi = UserGroupInformation.createRemoteUser(user)
-    ugi.doAs(new PrivilegedExceptionAction[T] {
-      override def run(): T = f
-    })
-  }
+/**
+ * An Optimizer without all `spark.sql.extensions`
+ */
+class SubmarineSparkOptimizer(spark: SparkSession) extends 
RuleExecutor[LogicalPlan] {
 
-  def enableAuthorizer(spark: SparkSession): Unit = {
-    
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
+  override def batches: Seq[Batch] = {
+    val optimizer = spark.sessionState.optimizer
+    val extRules = optimizer.extendedOperatorOptimizationRules
+    optimizer.batches.map { batch =>
+      val ruleSet = batch.rules.toSet -- extRules
+      Batch(batch.name, FixedPoint(batch.strategy.maxIterations), 
ruleSet.toSeq: _*)
+    }
   }
 }
diff --git 
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineRowFilter.scala
similarity index 64%
copy from 
submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
copy to 
submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineRowFilter.scala
index daf4d03..f61b323 100644
--- 
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SubmarineRowFilter.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *   http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,13 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.submarine.spark.security
+package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.SparkSessionExtensions
-import 
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
+import org.apache.spark.sql.catalyst.expressions.Attribute
 
-class RangerSparkSQLExtension extends Extensions {
-  override def apply(ext: SparkSessionExtensions): Unit = {
-    ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
-  }
+/**
+ * A wrapper for a transformed plan with row level filter applied, which will 
be removed during
+ * LogicalPlan -> PhysicalPlan
+ *
+ */
+case class SubmarineRowFilter(child: LogicalPlan) extends UnaryNode {
+  override def output: Seq[Attribute] = child.output
 }
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
similarity index 57%
copy from 
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
copy to 
submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
index 121afe3..ae9aad8 100644
--- 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/execution/SubmarineSparkPlanOmitStrategy.scala
@@ -15,23 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
+package org.apache.spark.sql.execution
 
-import java.security.PrivilegedExceptionAction
+import org.apache.spark.sql.{SparkSession, Strategy}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
SubmarineRowFilter}
 
-import org.apache.hadoop.security.UserGroupInformation
-import 
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
-
-object RangerSparkTestUtils {
-
-  def withUser[T](user: String)(f: => T): T = {
-    val ugi = UserGroupInformation.createRemoteUser(user)
-    ugi.doAs(new PrivilegedExceptionAction[T] {
-      override def run(): T = f
-    })
-  }
-
-  def enableAuthorizer(spark: SparkSession): Unit = {
-    
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
+/**
+ * An Apache Spark's [[Strategy]] extension for omitting marker for row level 
filtering and data
+ * masking.
+ */
+case class SubmarineSparkPlanOmitStrategy(spark: SparkSession) extends 
Strategy {
+  override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+    case SubmarineRowFilter(child) => planLater(child) :: Nil
+    case _ => Nil
   }
-}
+}
\ No newline at end of file
diff --git 
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
index daf4d03..80dc127 100644
--- 
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/RangerSparkSQLExtension.scala
@@ -18,10 +18,13 @@
 package org.apache.submarine.spark.security
 
 import org.apache.spark.sql.SparkSessionExtensions
-import 
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, 
SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
 
 class RangerSparkSQLExtension extends Extensions {
   override def apply(ext: SparkSessionExtensions): Unit = {
     ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
+    ext.injectOptimizerRule(SubmarineRowFilterExtension)
+    ext.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
 }
diff --git 
a/submarine-security/spark-security/src/test/resources/ranger-spark-audit.xml 
b/submarine-security/spark-security/src/test/resources/ranger-spark-audit.xml
new file mode 100644
index 0000000..32721d3
--- /dev/null
+++ 
b/submarine-security/spark-security/src/test/resources/ranger-spark-audit.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements. See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License. You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<configuration xmlns:xi="http://www.w3.org/2001/XInclude";>
+
+  <property>
+    <name>xasecure.audit.is.enabled</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <name>xasecure.audit.destination.db</name>
+    <value>false</value>
+  </property>
+
+</configuration>
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
similarity index 75%
rename from 
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
rename to 
submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
index 121afe3..1472413 100644
--- 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/RangerSparkTestUtils.scala
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
@@ -20,9 +20,10 @@ package org.apache.spark.sql
 import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.security.UserGroupInformation
-import 
org.apache.spark.sql.catalyst.optimizer.SubmarineSparkRangerAuthorizationExtension
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineRowFilterExtension, 
SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
 
-object RangerSparkTestUtils {
+object SubmarineSparkUtils {
 
   def withUser[T](user: String)(f: => T): T = {
     val ugi = UserGroupInformation.createRemoteUser(user)
@@ -34,4 +35,9 @@ object RangerSparkTestUtils {
   def enableAuthorizer(spark: SparkSession): Unit = {
     
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
   }
+
+  def enableRowFilter(spark: SparkSession): Unit = {
+    spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension)
+    spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
+  }
 }
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtensionTest.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtensionTest.scala
new file mode 100644
index 0000000..805ec3c
--- /dev/null
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineRowFilterExtensionTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, SubmarineRowFilter}
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.SubmarineSparkUtils
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarineRowFilterExtensionTest extends FunSuite with BeforeAndAfterAll {
+
+  private val spark = TestHive.sparkSession.newSession()
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.reset()
+  }
+
+  test("applying condition to orginal query if row filter exists in ranger") {
+    val extension = SubmarineRowFilterExtension(spark)
+    val frame = spark.sql("select * from src")
+    SubmarineSparkUtils.withUser("bob") {
+      val plan = extension.apply(frame.queryExecution.optimizedPlan)
+      assert(plan.collect { case f: Filter => f }.nonEmpty)
+      assert(plan.isInstanceOf[SubmarineRowFilter])
+    }
+
+    SubmarineSparkUtils.withUser("alice") {
+      val plan = extension.apply(frame.queryExecution.optimizedPlan)
+      assert(plan.collect { case f: Filter => f }.isEmpty)
+      assert(plan.isInstanceOf[SubmarineRowFilter])
+    }
+  }
+}
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizerTest.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizerTest.scala
new file mode 100644
index 0000000..e3e09b4
--- /dev/null
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarineSparkOptimizerTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.SubmarineSparkUtils
+import org.apache.spark.sql.execution.SubmarineShowTablesCommand
+import org.apache.spark.sql.execution.command.ShowTablesCommand
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarineSparkOptimizerTest extends FunSuite with BeforeAndAfterAll {
+
+  private val spark = TestHive.sparkSession.newSession()
+
+  private val optimizer = new SubmarineSparkOptimizer(spark)
+
+  private val originPlan = spark.sql("show 
tables").queryExecution.optimizedPlan
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.reset()
+  }
+
+  test("SubmarineSparkOptimizer will not do authorization") {
+    SubmarineSparkUtils.enableAuthorizer(spark)
+    val df = spark.sql("show tables")
+    val plan = df.queryExecution.optimizedPlan
+    assert(plan.isInstanceOf[SubmarineShowTablesCommand])
+    val unchanged = optimizer.execute(originPlan)
+    assert(unchanged.isInstanceOf[ShowTablesCommand])
+  }
+
+}
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/SparkRangerAuthorizerTest.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
similarity index 84%
rename from 
submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/SparkRangerAuthorizerTest.scala
rename to 
submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
index e1373d1..77c7b3e 100644
--- 
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/SparkRangerAuthorizerTest.scala
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/AuthorizationTest.scala
@@ -18,47 +18,25 @@
 package org.apache.submarine.spark.security
 
 import org.apache.spark.sql.hive.test.TestHive
-import org.apache.spark.sql.internal.SQLConf
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
-class SparkRangerAuthorizerTest extends FunSuite with BeforeAndAfterAll {
+class AuthorizationTest extends FunSuite with BeforeAndAfterAll {
 
-  import org.apache.spark.sql.RangerSparkTestUtils._
-  private val spark = TestHive.sparkSession
+  import org.apache.spark.sql.SubmarineSparkUtils._
+  private val spark = TestHive.sparkSession.newSession()
   private lazy val sql = spark.sql _
 
   override def beforeAll(): Unit = {
     super.beforeAll()
-    spark.conf.set(SQLConf.CROSS_JOINS_ENABLED.key, "true")
 
     sql(
       """
-        |CREATE TABLE default.rangertbl1 AS SELECT * FROM default.src
+        |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM 
default.src
       """.stripMargin)
 
     sql(
       """
-        |CREATE TABLE default.rangertbl2 AS SELECT * FROM default.src
-      """.stripMargin)
-
-    sql(
-      """
-        |CREATE TABLE default.rangertbl3 AS SELECT * FROM default.src
-      """.stripMargin)
-
-    sql(
-      """
-        |CREATE TABLE default.rangertbl4 AS SELECT * FROM default.src
-      """.stripMargin)
-
-    sql(
-      """
-        |CREATE TABLE default.rangertbl5 AS SELECT * FROM default.src
-      """.stripMargin)
-
-    sql(
-      """
-        |CREATE TABLE default.rangertbl6 AS SELECT * FROM default.src
+        |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM 
default.src
       """.stripMargin)
 
     sql(
@@ -77,10 +55,10 @@ class SparkRangerAuthorizerTest extends FunSuite with 
BeforeAndAfterAll {
     }
 
     withUser("alice") {
-      assert(sql("show tables").count() === 7)
+      assert(sql("show tables").count() === 3)
     }
     withUser("bob") {
-      assert(sql("show tables").count() === 7)
+      assert(sql("show tables").count() === 3)
     }
 
     enableAuthorizer(spark)
@@ -109,7 +87,7 @@ class SparkRangerAuthorizerTest extends FunSuite with 
BeforeAndAfterAll {
       assert(sql("show tables").count() === 0)
     }
     withUser("bob") {
-      assert(sql("show tables").count() === 7)
+      assert(sql("show tables").count() === 3)
     }
   }
 
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RowFilterSQLTest.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RowFilterSQLTest.scala
new file mode 100644
index 0000000..d0024bb
--- /dev/null
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/submarine/spark/security/RowFilterSQLTest.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.submarine.spark.security
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.SubmarineSparkUtils._
+import org.apache.spark.sql.catalyst.plans.logical.{Project, 
SubmarineRowFilter}
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class RowFilterSQLTest extends FunSuite with BeforeAndAfterAll {
+
+  private val spark = TestHive.sparkSession.newSession()
+  private lazy val sql = spark.sql _
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl1 AS SELECT * FROM 
default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl2 AS SELECT * FROM 
default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl3 AS SELECT * FROM 
default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl4 AS SELECT * FROM 
default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl5 AS SELECT * FROM 
default.src
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE IF NOT EXISTS default.rangertbl6 AS SELECT * FROM 
default.src
+      """.stripMargin)
+    enableRowFilter(spark)
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    spark.reset()
+  }
+
+
+  test("simple query") {
+    val statement = "select * from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      
assert(df.queryExecution.optimizedPlan.find(_.isInstanceOf[SubmarineRowFilter]).nonEmpty)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) < 20, "keys above 20 should be filtered 
automatically")
+      assert(df.count() === 20, "keys above 20 should be filtered 
automatically")
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      assert(df.count() === 500)
+    }
+  }
+
+  test("projection with ranger filter key") {
+    val statement = "select key from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) < 20)
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      assert(df.count() === 500)
+    }
+  }
+
+  test("projection without ranger filter key") {
+    val statement = "select value from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getString(0).split("_")(1).toInt < 20)
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      assert(df.count() === 500)
+    }
+  }
+
+  test("filter with with ranger filter key") {
+    val statement = "select key from default.src where key = 0"
+    val statement2 = "select key from default.src where key >= 20"
+    withUser("bob") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) === 0)
+      val df2 = sql(statement2)
+      assert(df2.count() === 0, "all keys should be filtered")
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      assert(df.count() === 3)
+      val df2 = sql(statement2)
+      assert(df2.count() === 480)
+    }
+  }
+
+  test("WITH alias") {
+    val statement = "select key as k1, value v1 from default.src"
+    withUser("bob") {
+      val df = sql(statement)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) < 20, "keys above 20 should be filtered 
automatically")
+      assert(df.count() === 20, "keys above 20 should be filtered 
automatically")
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      assert(df.count() === 500)
+    }
+  }
+
+  test("aggregate") {
+    val statement = "select sum(key) as k1, value v1 from default.src group by 
v1"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getString(1).split("_")(1).toInt < 20)
+    }
+    withUser("alice") {
+      val df = sql(statement)
+      assert(df.count() === 309)
+    }
+  }
+
+  test("with equal expression") {
+    val statement = "select * from default.rangertbl1"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) === 0, "rangertbl1 has an internal expression 
key=0")
+    }
+  }
+
+  test("with in set") {
+    val statement = "select * from default.rangertbl2"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val row = df.take(1)(0)
+      assert(row.getInt(0) === 0, "rangertbl2 has an internal expression key 
in (0, 1, 2)")
+    }
+  }
+
+  test("with in subquery") {
+    val statement = "select * from default.rangertbl3"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val rows = df.collect()
+      assert(rows.forall(_.getInt(0) < 100), "rangertbl3 has an internal 
expression key in (query)")
+    }
+  }
+
+  test("with in subquery self joined") {
+    val statement = "select * from default.rangertbl4"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val rows = df.collect()
+      assert(rows.length === 500)
+    }
+  }
+
+  test("with udf") {
+    val statement = "select * from default.rangertbl5"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val rows = df.collect()
+      assert(rows.length === 0)
+    }
+  }
+
+  test("with multiple expressions") {
+    val statement = "select * from default.rangertbl6"
+    withUser("bob") {
+      val df = sql(statement)
+      println(df.queryExecution.optimizedPlan)
+      val rows = df.collect()
+      assert(rows.forall { r => val x = r.getInt(0); x > 1 && x < 10 || x == 
500 })
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to