This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new 520510ff5 [KYUUBI #4617] [AUTHZ] Collect results for filtered show
objects ahead to prevent holding unserializable spark plan
520510ff5 is described below
commit 520510ff56e1162394aaa7f2df70679d435ebaa8
Author: liangbowen <[email protected]>
AuthorDate: Fri Mar 31 13:49:36 2023 +0800
[KYUUBI #4617] [AUTHZ] Collect results for filtered show objects ahead to
prevent holding unserializable spark plan
### _Why are the changes needed?_
To fix #4617.
- The reason for issue #4617 is that delegated SparkPlan is not serilizable
when execution
- Collect results for filtered show objects ahead in
FilterDataSourceV2Strategy to prevent holding the delegated plan
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4634 from bowenliang123/4617-filter.
Closes #4617
fe00ef58a [liangbowen] rename results to result
65ce03a51 [liangbowen] fix 4617
Authored-by: liangbowen <[email protected]>
Signed-off-by: liangbowen <[email protected]>
(cherry picked from commit 92f191a660699414c02ed94c6834253d6a42318d)
Signed-off-by: liangbowen <[email protected]>
---
.../authz/ranger/FilterDataSourceV2Strategy.scala | 7 +++--
.../authz/ranger/FilteredShowObjectsExec.scala | 36 +++++++++++++++-------
.../authz/ranger/RangerSparkExtensionSuite.scala | 2 ++
3 files changed, 32 insertions(+), 13 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
index 1109464ac..d39aacdcf 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilterDataSourceV2Strategy.scala
@@ -25,10 +25,13 @@ import
org.apache.kyuubi.plugin.spark.authz.util.ObjectFilterPlaceHolder
class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces"
=>
-
spark.sessionState.planner.plan(child).map(FilteredShowNamespaceExec).toSeq
+ spark.sessionState.planner.plan(child)
+ .map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowTables" =>
- spark.sessionState.planner.plan(child).map(FilteredShowTablesExec).toSeq
+ spark.sessionState.planner.plan(child)
+ .map(FilteredShowTablesExec(_, spark.sparkContext)).toSeq
+
case _ => Nil
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
index 7cc777d9b..67519118e 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/FilteredShowObjectsExec.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -26,24 +27,29 @@ import org.apache.kyuubi.plugin.spark.authz.{ObjectType,
OperationType}
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils
trait FilteredShowObjectsExec extends LeafExecNode {
- def delegated: SparkPlan
+ def result: Array[InternalRow]
- final override def output: Seq[Attribute] = delegated.output
-
- final private lazy val result = {
- delegated.executeCollect().filter(isAllowed(_,
AuthZUtils.getAuthzUgi(sparkContext)))
- }
+ override def output: Seq[Attribute]
final override def doExecute(): RDD[InternalRow] = {
sparkContext.parallelize(result, 1)
}
+}
- protected def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
+trait FilteredShowObjectsCheck {
+ def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean
}
-case class FilteredShowNamespaceExec(delegated: SparkPlan) extends
FilteredShowObjectsExec {
+case class FilteredShowNamespaceExec(result: Array[InternalRow], output:
Seq[Attribute])
+ extends FilteredShowObjectsExec {}
+object FilteredShowNamespaceExec extends FilteredShowObjectsCheck {
+ def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec
= {
+ val result = delegated.executeCollect()
+ .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
+ new FilteredShowNamespaceExec(result, delegated.output)
+ }
- override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation):
Boolean = {
+ override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean =
{
val database = r.getString(0)
val resource = AccessResource(ObjectType.DATABASE, database, null, null)
val request = AccessRequest(resource, ugi, OperationType.SHOWDATABASES,
AccessType.USE)
@@ -52,8 +58,16 @@ case class FilteredShowNamespaceExec(delegated: SparkPlan)
extends FilteredShowO
}
}
-case class FilteredShowTablesExec(delegated: SparkPlan) extends
FilteredShowObjectsExec {
- override protected def isAllowed(r: InternalRow, ugi: UserGroupInformation):
Boolean = {
+case class FilteredShowTablesExec(result: Array[InternalRow], output:
Seq[Attribute])
+ extends FilteredShowObjectsExec {}
+object FilteredShowTablesExec extends FilteredShowObjectsCheck {
+ def apply(delegated: SparkPlan, sc: SparkContext): FilteredShowNamespaceExec
= {
+ val result = delegated.executeCollect()
+ .filter(isAllowed(_, AuthZUtils.getAuthzUgi(sc)))
+ new FilteredShowNamespaceExec(result, delegated.output)
+ }
+
+ override def isAllowed(r: InternalRow, ugi: UserGroupInformation): Boolean =
{
val database = r.getString(0)
val table = r.getString(1)
val isTemp = r.getBoolean(2)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 77ef32454..6479af3c7 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -310,6 +310,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
doAs("admin", assert(sql(s"show tables from $db").collect().length ===
2))
doAs("bob", assert(sql(s"show tables from $db").collect().length === 0))
doAs("i_am_invisible", assert(sql(s"show tables from
$db").collect().length === 0))
+ doAs("i_am_invisible", assert(sql(s"show tables from
$db").limit(1).isEmpty))
}
}
@@ -324,6 +325,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
doAs("bob", assert(sql(s"SHOW DATABASES").collect().length == 1))
doAs("bob", assert(sql(s"SHOW
DATABASES").collectAsList().get(0).getString(0) == "default"))
+ doAs("i_am_invisible", assert(sql(s"SHOW DATABASES").limit(1).isEmpty))
}
}