This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 831117e  SUBMARINE-443. Predicates push down support through submarine 
temp makers
831117e is described below

commit 831117e3fcabefa14c15c2505aea63e5efba6e40
Author: Kent Yao <[email protected]>
AuthorDate: Fri Mar 20 22:25:11 2020 +0800

    SUBMARINE-443. Predicates push down support through submarine temp makers
    
    ### What is this PR for?
    
     Predicates push down support through submarine temp makers to avoid slow 
down user's queries potentially
    
    ### What type of PR is it?
    improvement
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * Open an issue on Jira https://issues.apache.org/jira/browse/SUBMARINE-443
    
    ### How should this be tested?
    new ut
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Kent Yao <[email protected]>
    
    Closes #241 from yaooqinn/SUBMARINE-443 and squashes the following commits:
    
    7919ff3 [Kent Yao] SUBMARINE-443. Predicates push down support through 
submarine temp markers
---
 .../SubmarinePushPredicatesThroughExtensions.scala | 34 +++++++++++++++
 .../security/api/RangerSparkSQLExtension.scala     |  3 +-
 .../org/apache/spark/sql/SubmarineSparkUtils.scala |  5 ++-
 ...marinePushPredicatesThroughExtensionsTest.scala | 49 ++++++++++++++++++++++
 4 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarinePushPredicatesThroughExtensions.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarinePushPredicatesThroughExtensions.scala
new file mode 100644
index 0000000..fb442d9
--- /dev/null
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/spark/sql/catalyst/optimizer/SubmarinePushPredicatesThroughExtensions.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
SubmarineDataMasking, SubmarineRowFilter}
+import org.apache.spark.sql.catalyst.rules.Rule
+
+case class SubmarinePushPredicatesThroughExtensions(spark: SparkSession) 
extends Rule[LogicalPlan] {
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case f @ Filter(_, SubmarineRowFilter(ch)) =>
+      SubmarineRowFilter(f.copy(child = ch))
+    case f @ Filter(_, SubmarineDataMasking(ch)) =>
+      SubmarineDataMasking(f.copy(child = ch))
+  }
+}
diff --git 
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/api/RangerSparkSQLExtension.scala
 
b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/api/RangerSparkSQLExtension.scala
index 681a169..efa286e 100644
--- 
a/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/api/RangerSparkSQLExtension.scala
+++ 
b/submarine-security/spark-security/src/main/scala/org/apache/submarine/spark/security/api/RangerSparkSQLExtension.scala
@@ -20,7 +20,7 @@
 package org.apache.submarine.spark.security.api
 
 import org.apache.spark.sql.SparkSessionExtensions
-import 
org.apache.spark.sql.catalyst.optimizer.{SubmarineConfigurationCheckExtension, 
SubmarineDataMaskingExtension, SubmarineRowFilterExtension, 
SubmarineSparkRangerAuthorizationExtension}
+import 
org.apache.spark.sql.catalyst.optimizer.{SubmarineConfigurationCheckExtension, 
SubmarineDataMaskingExtension, SubmarinePushPredicatesThroughExtensions, 
SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
 import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
 
 import org.apache.submarine.spark.security.Extensions
@@ -43,6 +43,7 @@ class RangerSparkSQLExtension extends Extensions {
     ext.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
     ext.injectOptimizerRule(SubmarineRowFilterExtension)
     ext.injectOptimizerRule(SubmarineDataMaskingExtension)
+    ext.injectOptimizerRule(SubmarinePushPredicatesThroughExtensions)
     ext.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
 }
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
index f46ac69..fbe0f3f 100644
--- 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/SubmarineSparkUtils.scala
@@ -22,7 +22,7 @@ package org.apache.spark.sql
 import java.security.PrivilegedExceptionAction
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, 
SubmarineRowFilterExtension, SubmarineSparkRangerAuthorizationExtension}
+import org.apache.spark.sql.catalyst.optimizer.{SubmarineDataMaskingExtension, 
SubmarinePushPredicatesThroughExtensions, SubmarineRowFilterExtension, 
SubmarineSparkRangerAuthorizationExtension}
 import org.apache.spark.sql.execution.SubmarineSparkPlanOmitStrategy
 
 object SubmarineSparkUtils {
@@ -40,11 +40,13 @@ object SubmarineSparkUtils {
 
   def enableRowFilter(spark: SparkSession): Unit = {
     spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension)
+    
spark.extensions.injectOptimizerRule(SubmarinePushPredicatesThroughExtensions)
     spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
 
   def enableDataMasking(spark: SparkSession): Unit = {
     spark.extensions.injectOptimizerRule(SubmarineDataMaskingExtension)
+    
spark.extensions.injectOptimizerRule(SubmarinePushPredicatesThroughExtensions)
     spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
 
@@ -52,6 +54,7 @@ object SubmarineSparkUtils {
     
spark.extensions.injectOptimizerRule(SubmarineSparkRangerAuthorizationExtension)
     spark.extensions.injectOptimizerRule(SubmarineRowFilterExtension)
     spark.extensions.injectOptimizerRule(SubmarineDataMaskingExtension)
+    
spark.extensions.injectOptimizerRule(SubmarinePushPredicatesThroughExtensions)
     spark.extensions.injectPlannerStrategy(SubmarineSparkPlanOmitStrategy)
   }
 }
diff --git 
a/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarinePushPredicatesThroughExtensionsTest.scala
 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarinePushPredicatesThroughExtensionsTest.scala
new file mode 100644
index 0000000..fcaa835
--- /dev/null
+++ 
b/submarine-security/spark-security/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SubmarinePushPredicatesThroughExtensionsTest.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Project, 
SubmarineDataMasking, SubmarineRowFilter}
+import org.apache.spark.sql.hive.test.TestHive
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+class SubmarinePushPredicatesThroughExtensionsTest extends FunSuite with 
BeforeAndAfterAll {
+
+    private val spark = TestHive.sparkSession.newSession()
+
+    override def afterAll(): Unit = {
+      super.afterAll()
+      spark.reset()
+    }
+
+    test("applying condition to original query if data masking exists in 
ranger") {
+      val extension = SubmarinePushPredicatesThroughExtensions(spark)
+      val frame = spark.sql("select * from src")
+      val f = spark.sessionState.sqlParser.parseExpression("src.key = 1")
+      val masking = SubmarineDataMasking(frame.queryExecution.optimizedPlan)
+      val filtering = SubmarineRowFilter(masking)
+      val plan = extension.apply(Filter(f, filtering))
+      assert(!plan.isInstanceOf[Filter])
+      assert(plan match {
+        case SubmarineRowFilter(SubmarineDataMasking(Filter(_, _))) => true
+        case _ => false
+      })
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to