This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new f67140e65 [KYUUBI #5594][AUTHZ] BuildQuery should respect normal
node's input
f67140e65 is described below
commit f67140e650d4fb335b43a65eb2678192c9b0b29a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sun Feb 4 16:39:55 2024 +0800
[KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input
# :mag: Description
## Issue References ๐
This pull request fixes #5594
## Describe Your Solution ๐ง
For case
```
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df = spark.read.table("test_mapinpandas")
execute_result = df.mapInPandas(filter_func, df.schema).show()
```
The logical plan is
```
GlobalLimit 21
+- LocalLimit 21
+- Project [cast(id#5 as string) AS id#11, name#6]
+- MapInPandas filter_func(id#0, name#1), [id#5, name#6]
+- HiveTableRelation [`default`.`test_mapinpandas`,
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#0, name#1],
Partition Cols: []]
```
When handle `MapInPandas`, we didn't match its input with
`HiveTableRelation`, cause we miss input table's columns. This pr fix this
In this pr, we remove the branch of each project such as `Project`,
`Aggregate` etc, handle it together.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
For case
```
def filter_func(iterator):
for pdf in iterator:
yield pdf[pdf.id == 1]
df = spark.read.table("test_mapinpandas")
execute_result = df.mapInPandas(filter_func, df.schema).show()
```
We miss column info of table `test_mapinpandas`
#### Behavior With This Pull Request :tada:
We got privilege object of table `test_mapinpandas` with it's column info.
#### Related Unit Tests
---
# Checklists
## ๐ Author Self Checklist
- [x] My code follows the [style
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
of this project
- [x] I have performed a self-review
- [x] I have commented my code, particularly in hard-to-understand areas
- [x] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my feature
works
- [x] New and existing unit tests pass locally with my changes
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
## ๐ Committer Pre-Merge Checklist
- [x] Pull request title is okay.
- [x] No license issues.
- [x] Milestone correctly set?
- [x] Test coverage is ok
- [x] Assignees are selected.
- [x] Minimum number of approvals
- [x] No changes are requested
**Be nice. Be informative.**
Closes #5787 from AngersZhuuuu/KYUUBI-5594-approach2.
Closes #5594
e08545599 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
49f09fb0a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
4781f75b9 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
9e9208d38 [Angerszhuuuu] Update
V2JdbcTableCatalogRangerSparkExtensionSuite.scala
626d3dd88 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
3d69997de [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
6eb4b8e1a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
61efb8ae3 [Angerszhuuuu] update
794ebb7be [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
a236da86b [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
74bd3f4d5 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
4acbc4276 [Angerszhuuuu] Merge branch 'KYUUBI-5594-approach2' of
https://github.com/AngersZhuuuu/incubator-kyuubi into KYUUBI-5594-approach2
266f7e877 [Angerszhuuuu] update
a6c784546 [Angerszhuuuu] Update PrivilegesBuilder.scala
d785d5fdf [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
014ef3b84 [Angerszhuuuu] Update PrivilegesBuilder.scala
7e1cd37a1 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
71d266162 [Angerszhuuuu] update
db9594170 [Angerszhuuuu] update
490eb95c2 [Angerszhuuuu] update
70d110e89 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
e6a587718 [Angerszhuuuu] Update PrivilegesBuilder.scala
5ff22b103 [Angerszhuuuu] Update PrivilegesBuilder.scala
e6843014b [Angerszhuuuu] Update PrivilegesBuilder.scala
594b202f7 [Angerszhuuuu] Update PrivilegesBuilder.scala
2f87c61e1 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
1de8c1c68 [Angerszhuuuu] Update PrivilegesBuilder.scala
ad17255d7 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
4f5e8505f [Angerszhuuuu] update
64349ed97 [Angerszhuuuu] Update PrivilegesBuilder.scala
11b7a4c13 [Angerszhuuuu] Update PrivilegesBuilder.scala
9a58fb0c4 [Angerszhuuuu] update
d0b022ec9 [Angerszhuuuu] Update RuleApplyPermanentViewMarker.scala
e0f28a640 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
0ebdd5de5 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
8e53236ac [Angerszhuuuu] update
3bafa7ca5 [Angerszhuuuu] update
d6e984e07 [Angerszhuuuu] update
b00bf5e20 [Angerszhuuuu] Update PrivilegesBuilder.scala
821422852 [Angerszhuuuu] update
93fc6892b [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
04184e39d [Angerszhuuuu] update
0bb762467 [Angerszhuuuu] Revert "Revert "Update PrivilegesBuilder.scala""
f481283ae [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala"
9f871822f [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala"
29b67c457 [Angerszhuuuu] Update PrivilegesBuilder.scala
8785ad1ab [Angerszhuuuu] Update PrivilegesBuilder.scala
270f21dcc [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
60872efcb [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
c34f32ea2 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
86fc4756a [Angerszhuuuu] Update PrivilegesBuilder.scala
404f1ea4c [Angerszhuuuu] Update PrivilegesBuilder.scala
dcca394e0 [Angerszhuuuu] Update PrivilegesBuilder.scala
c2c6fa447 [Angerszhuuuu] Update PrivilegesBuilder.scala
6f6a36e5b [Angerszhuuuu] Merge branch 'master' into
KYUUBI-5594]-AUTH]BuildQuery-should-respect-normal-node's-input
4dd47a124 [Angerszhuuuu] update
c549b6a1a [Angerszhuuuu] update
80013b981 [Angerszhuuuu] Update PrivilegesBuilder.scala
3cbba422a [Angerszhuuuu] Update PrivilegesBuilder.scala
Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../plugin/spark/authz/PrivilegesBuilder.scala | 93 +++++++-----
.../spark/authz/PrivilegesBuilderSuite.scala | 44 +++---
.../spark/authz/V2CommandsPrivilegesSuite.scala | 4 +-
.../DeltaCatalogRangerSparkExtensionSuite.scala | 33 +++--
.../HudiCatalogRangerSparkExtensionSuite.scala | 2 +-
.../IcebergCatalogRangerSparkExtensionSuite.scala | 126 +++++++++-------
.../authz/ranger/RangerSparkExtensionSuite.scala | 163 +++++++++++----------
...JdbcTableCatalogRangerSparkExtensionSuite.scala | 155 ++++++++++----------
8 files changed, 337 insertions(+), 283 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index 2d452ba9d..01266eb2c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression,
NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.ExplainCommand
import org.slf4j.LoggerFactory
@@ -69,44 +69,20 @@ object PrivilegesBuilder {
if (projectionList.isEmpty) {
privilegeObjects += PrivilegeObject(table, plan.output.map(_.name))
} else {
- val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
- .filter(plan.outputSet.contains).map(_.name).distinct
- privilegeObjects += PrivilegeObject(table, cols)
+ val cols = columnPrune(projectionList ++ conditionList, plan.outputSet)
+ privilegeObjects += PrivilegeObject(table, cols.map(_.name).distinct)
}
}
+ def columnPrune(projectionList: Seq[Expression], output: AttributeSet):
Seq[NamedExpression] = {
+ (projectionList ++ conditionList)
+ .flatMap(collectLeaves)
+ .filter(output.contains)
+ }
+
plan match {
case p if p.getTagValue(KYUUBI_AUTHZ_TAG).nonEmpty =>
- case p: Project => buildQuery(p.child, privilegeObjects, p.projectList,
conditionList, spark)
-
- case j: Join =>
- val cols =
- conditionList ++ j.condition.map(expr =>
collectLeaves(expr)).getOrElse(Nil)
- buildQuery(j.left, privilegeObjects, projectionList, cols, spark)
- buildQuery(j.right, privilegeObjects, projectionList, cols, spark)
-
- case f: Filter =>
- val cols = conditionList ++ collectLeaves(f.condition)
- buildQuery(f.child, privilegeObjects, projectionList, cols, spark)
-
- case w: Window =>
- val orderCols = w.orderSpec.flatMap(orderSpec =>
collectLeaves(orderSpec))
- val partitionCols = w.partitionSpec.flatMap(partitionSpec =>
collectLeaves(partitionSpec))
- val cols = conditionList ++ orderCols ++ partitionCols
- buildQuery(w.child, privilegeObjects, projectionList, cols, spark)
-
- case s: Sort =>
- val sortCols = s.order.flatMap(sortOrder => collectLeaves(sortOrder))
- val cols = conditionList ++ sortCols
- buildQuery(s.child, privilegeObjects, projectionList, cols, spark)
-
- case a: Aggregate =>
- val aggCols =
- (a.aggregateExpressions ++ a.groupingExpressions).flatMap(e =>
collectLeaves(e))
- val cols = conditionList ++ aggCols
- buildQuery(a.child, privilegeObjects, projectionList, cols, spark)
-
case scan if isKnownScan(scan) && scan.resolved =>
val tables = getScanSpec(scan).tables(scan, spark)
// If the the scan is table-based, we check privileges on the table we
found
@@ -125,7 +101,33 @@ object PrivilegesBuilder {
case p =>
for (child <- p.children) {
- buildQuery(child, privilegeObjects, projectionList, conditionList,
spark)
+ // If current plan's references don't have relation to it's input,
have two cases
+ // 1. `MapInPandas`, `ScriptTransformation`
+ // 2. `Project` output only have constant value
+ if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty)
{
+ // If plan is project and output don't have relation to input, can
ignore.
+ if (!p.isInstanceOf[Project]) {
+ buildQuery(
+ child,
+ privilegeObjects,
+ p.inputSet.map(_.toAttribute).toSeq,
+ Nil,
+ spark)
+ }
+ } else {
+ buildQuery(
+ child,
+ privilegeObjects,
+ // Here we use `projectList ++ p.reference` do column prune.
+ // For `Project`, `Aggregate`, plan's output is contained by
plan's referenced
+ // For `Filter`, `Sort` etc... it rely on upper `Project` node,
+ // since we wrap a `Project` before call `buildQuery()`.
+ // So here we use upper node's projectionList and current's
references
+ // to do column pruning can get the correct column.
+ columnPrune(projectionList ++ p.references.toSeq,
p.inputSet).distinct,
+ conditionList ++ p.references,
+ spark)
+ }
}
}
}
@@ -221,7 +223,26 @@ object PrivilegesBuilder {
LOG.debug(ud.error(plan, e))
}
}
- spec.queries(plan).foreach(buildQuery(_, inputObjs, spark = spark))
+ spec.queries(plan).foreach { p =>
+ if (p.resolved) {
+ buildQuery(Project(p.output, p), inputObjs, spark = spark)
+ } else {
+ try {
+ // For spark 3.1, Some command such as CreateTableASSelect, its
query was unresolved,
+ // Before this pr, we just ignore it, now we support this.
+ val analyzed = spark.sessionState.analyzer.execute(p)
+ buildQuery(Project(analyzed.output, analyzed), inputObjs, spark
= spark)
+ } catch {
+ case e: Exception =>
+ LOG.debug(
+ s"""
+ |Failed to analyze unresolved
+ |$p
+ |due to ${e.getMessage}""".stripMargin,
+ e)
+ }
+ }
+ }
spec.operationType
case classname if FUNCTION_COMMAND_SPECS.contains(classname) =>
@@ -315,7 +336,7 @@ object PrivilegesBuilder {
case cmd: Command => buildCommand(cmd, inputObjs, outputObjs, spark)
// Queries
case _ =>
- buildQuery(plan, inputObjs, spark = spark)
+ buildQuery(Project(plan.output, plan), inputObjs, spark = spark)
OperationType.QUERY
}
(inputObjs, outputObjs, opType)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index 673a2e437..d8b672a56 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -59,11 +59,15 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
protected def checkColumns(plan: LogicalPlan, cols: Seq[String]): Unit = {
val (in, out, _) = PrivilegesBuilder.build(plan, spark)
assert(out.isEmpty, "Queries shall not check output privileges")
- val po = in.head
- assert(po.actionType === PrivilegeObjectActionType.OTHER)
- assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
- assert(po.columns === cols)
- checkTableOwner(po)
+ if (in.nonEmpty) {
+ val po = in.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.columns === cols)
+ checkTableOwner(po)
+ } else {
+ assert(cols.isEmpty)
+ }
}
protected def checkColumns(query: String, cols: Seq[String]): Unit = {
@@ -365,7 +369,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
if (isSparkV32OrGreater) {
// Query in AlterViewAsCommand can not be resolved before SPARK-34698
- assert(po0.columns === Seq("key", "value", "pid"))
+ assert(po0.columns === Seq("key", "pid", "value"))
checkTableOwner(po0)
}
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
@@ -526,12 +530,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
- if (isSparkV32OrGreater) {
- assert(po0.columns.head === "key")
- checkTableOwner(po0)
- } else {
- assert(po0.columns.isEmpty)
- }
+ assert(po0.columns.head === "key")
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -549,12 +549,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
- if (isSparkV32OrGreater) {
- assert(po0.columns === Seq("key", "value"))
- checkTableOwner(po0)
- } else {
- assert(po0.columns.isEmpty)
- }
+ assert(po0.columns === Seq("key", "value"))
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -1050,7 +1046,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assertEqualsIgnoreCase(reusedDb)(po.dbname)
assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
assert(
- po.columns === Seq("value", "pid", "key"),
+ po.columns === Seq("value", "key", "pid"),
s"$reusedPartTable both 'key', 'value' and 'pid' should be
authenticated")
checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
@@ -1107,7 +1103,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assertEqualsIgnoreCase(reusedDb)(po.dbname)
assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
assert(
- po.columns === Seq("key", "value"),
+ po.columns.sorted === Seq("key", "value").sorted,
s"$reusedPartTable 'key' is the join key and 'pid' is omitted")
checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
@@ -1218,7 +1214,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assertEqualsIgnoreCase(reusedDb)(po.dbname)
assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
assert(
- po.columns === Seq("key", "value", "pid"),
+ po.columns === Seq("key", "pid", "value"),
s"$reusedPartTable both 'key', 'value' and 'pid' should be
authenticated")
checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
@@ -1625,7 +1621,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
- assert(po0.columns === Seq("key", "value", "pid"))
+ assert(po0.columns === Seq("key", "pid", "value"))
checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -1721,7 +1717,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(out1.isEmpty)
val pi1 = in1.head
assert(pi1.columns.size === 3)
- assert(pi1.columns === Seq("key", "value", "pid"))
+ assert(pi1.columns === Seq("key", "pid", "value"))
// case2: Some columns are involved, and the group column is not selected.
val plan2 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid")
@@ -1741,7 +1737,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(out3.isEmpty)
val pi3 = in3.head
assert(pi3.columns.size === 2)
- assert(pi3.columns === Seq("key", "pid"))
+ assert(pi3.columns === Seq("pid", "key"))
// case4: HAVING & GROUP clause
val plan4 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid
HAVING MAX(key) > 1000")
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
index 62b7939b3..1b6e07b77 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -127,7 +127,7 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
assert(po0.catalog.isEmpty)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
- assert(po0.columns.take(2) === Seq("key", "value"))
+ assert(po0.columns === Seq("a", "key", "value"))
checkTableOwner(po0)
assert(outputs.size === 1)
@@ -186,7 +186,7 @@ abstract class V2CommandsPrivilegesSuite extends
PrivilegesBuilderSuite {
assert(po0.catalog.isEmpty)
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
- assert(po0.columns.take(2) === Seq("key", "value"))
+ assert(po0.columns === Seq("a", "key", "value"))
checkTableOwner(po0)
assert(outputs.size === 1)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 1ce8ad676..dbf88d7d0 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -215,8 +215,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s" SELECT * FROM $namespace1.$table2"
interceptEndsWith[AccessControlException](
doAs(someone, sql(insertIntoSql)))(
- s"does not have [select] privilege on
[$namespace1/$table2/id,$namespace1/$table2/name," +
- s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+ s"does not have [select] privilege on " +
+ s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
+ s"$namespace1/$table2/id,$namespace1/$table2/name]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(insertIntoSql))
@@ -225,8 +226,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s" SELECT * FROM $namespace1.$table2"
interceptEndsWith[AccessControlException](
doAs(someone, sql(insertOverwriteSql)))(
- s"does not have [select] privilege on
[$namespace1/$table2/id,$namespace1/$table2/name," +
- s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+ s"does not have [select] privilege on " +
+ s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
+ s"$namespace1/$table2/id,$namespace1/$table2/name]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(insertOverwriteSql))
}
@@ -283,8 +285,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
|""".stripMargin
interceptEndsWith[AccessControlException](
doAs(someone, sql(mergeIntoSql)))(
- s"does not have [select] privilege on
[$namespace1/$table2/id,$namespace1/$table2/name," +
- s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+ s"does not have [select] privilege on " +
+ s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
+ s"$namespace1/$table2/id,$namespace1/$table2/name]," +
s" [update] privilege on [$namespace1/$table1]")
doAs(admin, sql(mergeIntoSql))
}
@@ -378,9 +381,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM
$namespace1.$table2"
interceptEndsWith[AccessControlException](
doAs(someone, sql(insertIntoSql)))(
- s"does not have [select] privilege on [$namespace1/$table2/id," +
- s"$namespace1/$table2/name,$namespace1/$table2/gender," +
- s"$namespace1/$table2/birthDate], [write] privilege on [[$path,
$path/]]")
+ s"does not have [select] privilege on
[$namespace1/$table2/birthDate," +
+ s"$namespace1/$table2/gender,$namespace1/$table2/id," +
+ s"$namespace1/$table2/name], [write] privilege on [[$path,
$path/]]")
doAs(admin, sql(insertIntoSql))
// insert overwrite
@@ -388,9 +391,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
interceptEndsWith[AccessControlException](
doAs(someone, sql(insertOverwriteSql)))(
- s"does not have [select] privilege on [$namespace1/$table2/id," +
- s"$namespace1/$table2/name,$namespace1/$table2/gender," +
- s"$namespace1/$table2/birthDate], [write] privilege on [[$path,
$path/]]")
+ s"does not have [select] privilege on
[$namespace1/$table2/birthDate," +
+ s"$namespace1/$table2/gender,$namespace1/$table2/id," +
+ s"$namespace1/$table2/name], [write] privilege on [[$path,
$path/]]")
doAs(admin, sql(insertOverwriteSql))
})
}
@@ -433,9 +436,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
|""".stripMargin
interceptEndsWith[AccessControlException](
doAs(someone, sql(mergeIntoSql)))(
- s"does not have [select] privilege on [$namespace1/$table2/id," +
- s"$namespace1/$table2/name,$namespace1/$table2/gender," +
- s"$namespace1/$table2/birthDate], [write] privilege on [[$path,
$path/]]")
+ s"does not have [select] privilege on
[$namespace1/$table2/birthDate," +
+ s"$namespace1/$table2/gender,$namespace1/$table2/id," +
+ s"$namespace1/$table2/name], [write] privilege on [[$path,
$path/]]")
doAs(admin, sql(mergeIntoSql))
})
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index a7c0c7662..72d4130ef 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -507,7 +507,7 @@ class HudiCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
interceptEndsWith[AccessControlException] {
doAs(someone, sql(mergeIntoSQL))
}(s"does not have [select] privilege on " +
-
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city], "
+
+
s"[$namespace1/$table2/city,$namespace1/$table2/id,$namespace1/$table2/name], "
+
s"[update] privilege on [$namespace1/$table1]")
doAs(admin, sql(mergeIntoSQL))
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index 677b3945d..cf798cdfa 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -93,73 +93,87 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
}
test("[KYUUBI #3515] MERGE INTO") {
- val mergeIntoSql =
- s"""
- |MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target
- |USING $catalogV2.$namespace1.$table1 AS source
- |ON target.id = source.id
- |WHEN MATCHED AND (target.name='delete') THEN DELETE
- |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city
= source.city
+ withSingleCallEnabled {
+ val mergeIntoSql =
+ s"""
+ |MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target
+ |USING $catalogV2.$namespace1.$table1 AS source
+ |ON target.id = source.id
+ |WHEN MATCHED AND (target.name='delete') THEN DELETE
+ |WHEN MATCHED AND (target.name='update') THEN UPDATE SET
target.city = source.city
""".stripMargin
- // MergeIntoTable: Using a MERGE INTO Statement
- val e1 = intercept[AccessControlException](
- doAs(
- someone,
- sql(mergeIntoSql)))
- assert(e1.getMessage.contains(s"does not have [select] privilege" +
- s" on [$namespace1/$table1/id]"))
-
- withSingleCallEnabled {
- interceptEndsWith[AccessControlException](doAs(someone,
sql(mergeIntoSql)))(
- if (isSparkV35OrGreater) {
- s"does not have [select] privilege on [$namespace1/table1/id" +
- s",$namespace1/$table1/name,$namespace1/$table1/city]"
- } else {
- "does not have " +
- s"[select] privilege on
[$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city]," +
- s" [update] privilege on [$bobNamespace/$bobSelectTable]"
- })
+ // MergeIntoTable: Using a MERGE INTO Statement
+ val e1 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(mergeIntoSql)))
+ assert(e1.getMessage.contains(s"does not have [select] privilege" +
+ s" on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+
+ withSingleCallEnabled {
+ interceptEndsWith[AccessControlException](doAs(someone,
sql(mergeIntoSql)))(
+ if (isSparkV35OrGreater) {
+ s"does not have [select] privilege on [$namespace1/table1/city" +
+ s",$namespace1/$table1/id,$namespace1/$table1/name]"
+ } else {
+ "does not have " +
+ s"[select] privilege on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]," +
+ s" [update] privilege on [$bobNamespace/$bobSelectTable]"
+ })
+
+ interceptEndsWith[AccessControlException] {
+ doAs(bob, sql(mergeIntoSql))
+ }(s"does not have [update] privilege on
[$bobNamespace/$bobSelectTable]")
+ }
- interceptEndsWith[AccessControlException] {
- doAs(bob, sql(mergeIntoSql))
- }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
+ doAs(admin, sql(mergeIntoSql))
}
-
- doAs(admin, sql(mergeIntoSql))
}
test("[KYUUBI #3515] UPDATE TABLE") {
- // UpdateTable
- interceptEndsWith[AccessControlException] {
- doAs(someone, sql(s"UPDATE $catalogV2.$namespace1.$table1 SET
city='Guangzhou' WHERE id=1"))
- }(if (isSparkV35OrGreater) {
- s"does not have [select] privilege on [$namespace1/$table1/id]"
- } else {
- s"does not have [update] privilege on [$namespace1/$table1]"
- })
+ withSingleCallEnabled {
+ // UpdateTable
+ interceptEndsWith[AccessControlException] {
+ doAs(
+ someone,
+ sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou'
WHERE id=1"))
+ }(if (isSparkV35OrGreater) {
+ s"does not have [select] privilege on " +
+ s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," +
+
s"$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city], " +
+ s"[update] privilege on [$namespace1/$table1]"
+ } else {
+ s"does not have [update] privilege on [$namespace1/$table1]"
+ })
- doAs(
- admin,
- sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
- " WHERE id=1"))
+ doAs(
+ admin,
+ sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
+ " WHERE id=1"))
+ }
}
test("[KYUUBI #3515] DELETE FROM TABLE") {
- // DeleteFromTable
- interceptEndsWith[AccessControlException] {
- doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE
id=2"))
- }(if (isSparkV34OrGreater) {
- s"does not have [select] privilege on [$namespace1/$table1/id]"
- } else {
- s"does not have [update] privilege on [$namespace1/$table1]"
- })
-
- interceptEndsWith[AccessControlException] {
- doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable
WHERE id=2"))
- }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
-
- doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
+ withSingleCallEnabled {
+ // DeleteFromTable
+ interceptEndsWith[AccessControlException] {
+ doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE
id=2"))
+ }(if (isSparkV34OrGreater) {
+ s"does not have [select] privilege on " +
+ s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," +
+
s"$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name], " +
+ s"[update] privilege on [$namespace1/$table1]"
+ } else {
+ s"does not have [update] privilege on [$namespace1/$table1]"
+ })
+
+ interceptEndsWith[AccessControlException] {
+ doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable
WHERE id=2"))
+ }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
+
+ doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE
id=2"))
+ }
}
test("[KYUUBI #3666] Support {OWNER} variable for queries run on CatalogV2")
{
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 43333ea77..6feb63eb9 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -23,12 +23,15 @@ import java.nio.file.Path
import scala.util.Try
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.{Row, SparkSessionExtensions}
+import org.apache.spark.sql.{DataFrame, Row, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.scalatest.BeforeAndAfterAll
// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
@@ -555,11 +558,7 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
val e2 = intercept[AccessControlException](
doAs(someone, sql(s"CREATE VIEW $permView AS SELECT * FROM $table")))
- if (isSparkV32OrGreater) {
- assert(e2.getMessage.contains(s"does not have [select] privilege on
[default/$table/id]"))
- } else {
- assert(e2.getMessage.contains(s"does not have [select] privilege on
[$table]"))
- }
+ assert(e2.getMessage.contains(s"does not have [select] privilege on
[default/$table/id]"))
}
}
@@ -638,14 +637,12 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s" FROM $db1.$srcTable1 as tb1" +
s" JOIN $db1.$srcTable2 as tb2" +
s" on tb1.id = tb2.id"
- val e1 = intercept[AccessControlException](doAs(someone,
sql(insertSql1)))
- assert(e1.getMessage.contains(s"does not have [select] privilege on
[$db1/$srcTable1/id]"))
withSingleCallEnabled {
- val e2 = intercept[AccessControlException](doAs(someone,
sql(insertSql1)))
- assert(e2.getMessage.contains(s"does not have" +
+ val e = intercept[AccessControlException](doAs(someone,
sql(insertSql1)))
+ assert(e.getMessage.contains(s"does not have" +
s" [select] privilege on" +
- s" [$db1/$srcTable1/id,$db1/$srcTable1/name,$db1/$srcTable1/city," +
+ s" [$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name," +
s"$db1/$srcTable2/age,$db1/$srcTable2/id]," +
s" [update] privilege on [$db1/$sinkTable1/id,$db1/$sinkTable1/age,"
+
s"$db1/$sinkTable1/name,$db1/$sinkTable1/city]"))
@@ -675,11 +672,13 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" +
s" (id int, name string, city string)"))
- val e1 = intercept[AccessControlException](
- doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from
$db1.$srcTable1")))
- assert(
- e1.getMessage.contains(s"does not have [select] privilege on
[$db1/$srcTable1/id]"))
-
+ withSingleCallEnabled {
+ val e1 = intercept[AccessControlException](
+ doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from
$db1.$srcTable1")))
+ assert(
+ e1.getMessage.contains(s"does not have [select] privilege on " +
+
s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]"))
+ }
doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b "))
doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b "))
}
@@ -888,9 +887,15 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
doAs(
someone,
sql(s"SELECT id as new_id, name, max_scope FROM
$db1.$view1".stripMargin).show()))
- assert(e2.getMessage.contains(
- s"does not have [select] privilege on " +
- s"[$db1/$view1/id,$db1/$view1/name,$db1/$view1/max_scope]"))
+ if (isSparkV35OrGreater) {
+ assert(e2.getMessage.contains(
+ s"does not have [select] privilege on " +
+ s"[$db1/$view1/id,$db1/$view1/max_scope,$db1/$view1/name]"))
+ } else {
+ assert(e2.getMessage.contains(
+ s"does not have [select] privilege on " +
+ s"[$db1/$view1/name,$db1/$view1/id,$db1/$view1/max_scope]"))
+ }
}
}
}
@@ -927,17 +932,11 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
|AS
|SELECT count(*) as cnt, sum(id) as sum_id FROM $db1.$table1
""".stripMargin))
- interceptEndsWith[AccessControlException](
- doAs(someone, sql(s"SELECT count(*) FROM $db1.$table1").show()))(
- s"does not have [select] privilege on
[$db1/$table1/id,$db1/$table1/scope]")
+ checkAnswer(someone, s"SELECT count(*) FROM $db1.$table1", Row(0) ::
Nil)
- interceptEndsWith[AccessControlException](
- doAs(someone, sql(s"SELECT count(*) FROM $db1.$view1").show()))(
- s"does not have [select] privilege on
[$db1/$view1/id,$db1/$view1/scope]")
+ checkAnswer(someone, s"SELECT count(*) FROM $db1.$view1", Row(0) ::
Nil)
- interceptEndsWith[AccessControlException](
- doAs(someone, sql(s"SELECT count(*) FROM $db1.$view2").show()))(
- s"does not have [select] privilege on
[$db1/$view2/cnt,$db1/$view2/sum_id]")
+ checkAnswer(someone, s"SELECT count(*) FROM $db1.$view2", Row(1) ::
Nil)
interceptEndsWith[AccessControlException](
doAs(someone, sql(s"SELECT count(id) FROM $db1.$table1 WHERE id >
10").show()))(
@@ -1321,7 +1320,7 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
doAs(
someone,
sql(s"SELECT typeof(id), typeof(typeof(day)) FROM
$db1.$table1").collect()))(
- s"does not have [select] privilege on
[$db1/$table1/id,$db1/$table1/day]")
+ s"does not have [select] privilege on
[$db1/$table1/day,$db1/$table1/id]")
interceptEndsWith[AccessControlException](
doAs(
someone,
@@ -1331,7 +1330,7 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
|typeof(cast(id as string)),
|typeof(substring(day, 1, 3))
|FROM $db1.$table1""".stripMargin).collect()))(
- s"does not have [select] privilege on
[$db1/$table1/id,$db1/$table1/day]")
+ s"does not have [select] privilege on
[$db1/$table1/day,$db1/$table1/id]")
checkAnswer(
admin,
s"""
@@ -1414,60 +1413,76 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
}
- test("[KYUUBI #5884] PVM should inherit MultiInstance and wrap with new
exprId") {
+ test("[KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input ")
{
+ assume(!isSparkV35OrGreater, "mapInPandas not supported after spark 3.5")
val db1 = defaultDb
val table1 = "table1"
- val perm_view = "perm_view"
val view1 = "view1"
- val view2 = "view2"
- val view3 = "view3"
withSingleCallEnabled {
- withCleanTmpResources(Seq.empty) {
- sql("set spark.sql.legacy.storeAnalyzedPlanForView=true")
- doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1(id int,
scope int)"))
- doAs(admin, sql(s"CREATE VIEW $db1.$perm_view AS SELECT * FROM
$db1.$table1"))
-
- doAs(
- admin,
- sql(
- s"""
- |CREATE OR REPLACE TEMPORARY VIEW $view1 AS
- |SELECT *
- |FROM $db1.$perm_view
- |WHERE id > 10
- |""".stripMargin))
+ withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1",
"view"))) {
+ doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int,
scope int)"))
+ doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM
$db1.$table1"))
- doAs(
- admin,
- sql(
- s"""
- |CREATE OR REPLACE TEMPORARY VIEW $view2 AS
- |SELECT *
- |FROM $view1
- |WHERE scope < 10
- |""".stripMargin))
+ val table = spark.read.table(s"$db1.$table1")
+ val mapTableInPandasUDF = PythonUDF(
+ "mapInPandasUDF",
+ null,
+ StructType(Seq(StructField("id", IntegerType), StructField("scope",
IntegerType))),
+ table.queryExecution.analyzed.output,
+ 205,
+ true)
+ interceptContains[AccessControlException](
+ doAs(
+ someone,
+ invokeAs(
+ table,
+ "mapInPandas",
+ (classOf[PythonUDF], mapTableInPandasUDF))
+ .asInstanceOf[DataFrame].select(col("id"),
col("scope")).limit(1).show(true)))(
+ s"does not have [select] privilege on
[$db1/$table1/id,$db1/$table1/scope]")
- doAs(
- admin,
- sql(
- s"""
- |CREATE OR REPLACE TEMPORARY VIEW $view3 AS
- |SELECT *
- |FROM $view1
- |WHERE scope is not null
- |""".stripMargin))
+ val view = spark.read.table(s"$db1.$view1")
+ val mapViewInPandasUDF = PythonUDF(
+ "mapInPandasUDF",
+ null,
+ StructType(Seq(StructField("id", IntegerType), StructField("scope",
IntegerType))),
+ view.queryExecution.analyzed.output,
+ 205,
+ true)
+ interceptContains[AccessControlException](
+ doAs(
+ someone,
+ invokeAs(
+ view,
+ "mapInPandas",
+ (classOf[PythonUDF], mapViewInPandasUDF))
+ .asInstanceOf[DataFrame].select(col("id"),
col("scope")).limit(1).show(true)))(
+ s"does not have [select] privilege on
[$db1/$view1/id,$db1/$view1/scope]")
+ }
+ }
+ }
+ test("[KYUUBI #5594][AUTHZ] BuildQuery should respect sort agg input") {
+ val db1 = defaultDb
+ val table1 = "table1"
+ val view1 = "view1"
+ withSingleCallEnabled {
+ withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1",
"view"))) {
+ doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int,
scope int)"))
+ doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM
$db1.$table1"))
+ checkAnswer(
+ someone,
+ s"SELECT count(*) FROM $db1.$table1 WHERE id > 1",
+ Row(0) :: Nil)
+ checkAnswer(
+ someone,
+ s"SELECT count(*) FROM $db1.$view1 WHERE id > 1",
+ Row(0) :: Nil)
interceptContains[AccessControlException](
doAs(
someone,
- sql(
- s"""
- |SELECT a.*, b.scope as new_scope
- |FROM $view2 a
- |JOIN $view3 b
- |ON a.id == b.id
- |""".stripMargin).collect()))(s"does not have [select]
privilege on " +
-
s"[$db1/$perm_view/id,$db1/$perm_view/scope,$db1/$perm_view/scope,$db1/$perm_view/id]")
+ sql(s"SELECT count(id) FROM $db1.$view1 WHERE id > 1").collect()))(
+ s"does not have [select] privilege on [$db1/$view1/id]")
}
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
index 046052d55..3a22f45d5 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
@@ -107,20 +107,23 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSu
}
test("[KYUUBI #3424] CREATE TABLE") {
- // CreateTable
- val e2 = intercept[AccessControlException](
- doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS
$catalogV2.$namespace1.$table2")))
- assert(e2.getMessage.contains(s"does not have [create] privilege" +
- s" on [$namespace1/$table2]"))
+ withSingleCallEnabled {
+ // CreateTable
+ val e2 = intercept[AccessControlException](
+ doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS
$catalogV2.$namespace1.$table2")))
+ assert(e2.getMessage.contains(s"does not have [create] privilege" +
+ s" on [$namespace1/$table2]"))
- // CreateTableAsSelect
- val e21 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
- s" AS select * from $catalogV2.$namespace1.$table1")))
- assert(e21.getMessage.contains(s"does not have [select] privilege" +
- s" on [$namespace1/$table1/id]"))
+ // CreateTableAsSelect
+
+ val e21 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
+ s" AS select * from $catalogV2.$namespace1.$table1")))
+ assert(e21.getMessage.contains(s"does not have [select] privilege" +
+ s" on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+ }
}
test("[KYUUBI #3424] DROP TABLE") {
@@ -133,69 +136,74 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSu
test("[KYUUBI #3424] INSERT TABLE") {
// AppendData: Insert Using a VALUES Clause
- val e4 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
- s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
- assert(e4.getMessage.contains(s"does not have [update] privilege" +
- s" on [$namespace1/$outputTable1]"))
+ withSingleCallEnabled {
- // AppendData: Insert Using a TABLE Statement
- val e42 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
- s" TABLE $catalogV2.$namespace1.$table1")))
- assert(e42.getMessage.contains(s"does not have [select] privilege" +
- s" on [$namespace1/$table1/id]"))
+ val e4 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
+ s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
+ assert(e4.getMessage.contains(s"does not have [update] privilege" +
+ s" on [$namespace1/$outputTable1]"))
- // AppendData: Insert Using a SELECT Statement
- val e43 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
- s" SELECT * from $catalogV2.$namespace1.$table1")))
- assert(e43.getMessage.contains(s"does not have [select] privilege" +
- s" on [$namespace1/$table1/id]"))
+ // AppendData: Insert Using a TABLE Statement
+ val e42 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
+ s" TABLE $catalogV2.$namespace1.$table1")))
+ assert(e42.getMessage.contains(s"does not have [select] privilege" +
+ s" on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
- // OverwriteByExpression: Insert Overwrite
- val e44 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
- s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
- assert(e44.getMessage.contains(s"does not have [update] privilege" +
- s" on [$namespace1/$outputTable1]"))
+ // AppendData: Insert Using a SELECT Statement
+ val e43 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name,
city)" +
+ s" SELECT * from $catalogV2.$namespace1.$table1")))
+ assert(e43.getMessage.contains(s"does not have [select] privilege" +
+ s" on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+
+ // OverwriteByExpression: Insert Overwrite
+ val e44 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id,
name, city)" +
+ s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
+ assert(e44.getMessage.contains(s"does not have [update] privilege" +
+ s" on [$namespace1/$outputTable1]"))
+ }
}
test("[KYUUBI #3424] MERGE INTO") {
- val mergeIntoSql =
- s"""
- |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
- |USING $catalogV2.$namespace1.$table1 AS source
- |ON target.id = source.id
- |WHEN MATCHED AND (target.name='delete') THEN DELETE
- |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city
= source.city
+ withSingleCallEnabled {
+ val mergeIntoSql =
+ s"""
+ |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
+ |USING $catalogV2.$namespace1.$table1 AS source
+ |ON target.id = source.id
+ |WHEN MATCHED AND (target.name='delete') THEN DELETE
+ |WHEN MATCHED AND (target.name='update') THEN UPDATE SET
target.city = source.city
""".stripMargin
- // MergeIntoTable: Using a MERGE INTO Statement
- val e1 = intercept[AccessControlException](
- doAs(
- someone,
- sql(mergeIntoSql)))
- assert(e1.getMessage.contains(s"does not have [select] privilege" +
- s" on [$namespace1/$table1/id]"))
-
- withSingleCallEnabled {
- val e2 = intercept[AccessControlException](
+ // MergeIntoTable: Using a MERGE INTO Statement
+ val e1 = intercept[AccessControlException](
doAs(
someone,
sql(mergeIntoSql)))
- assert(e2.getMessage.contains(s"does not have" +
- s" [select] privilege" +
- s" on
[$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
- s" [update] privilege on [$namespace1/$outputTable1]"))
+ assert(e1.getMessage.contains(s"does not have [select] privilege" +
+ s" on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+
+ withSingleCallEnabled {
+ val e2 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(mergeIntoSql)))
+ assert(e2.getMessage.contains(s"does not have" +
+ s" [select] privilege" +
+ s" on
[$namespace1/$table1/city,$namespace1/table1/id,$namespace1/$table1/name]," +
+ s" [update] privilege on [$namespace1/$outputTable1]"))
+ }
}
}
@@ -220,17 +228,14 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSu
test("[KYUUBI #3424] CACHE TABLE") {
// CacheTable
- val e7 = intercept[AccessControlException](
- doAs(
- someone,
- sql(s"CACHE TABLE $cacheTable1" +
- s" AS select * from $catalogV2.$namespace1.$table1")))
- if (isSparkV32OrGreater) {
- assert(e7.getMessage.contains(s"does not have [select] privilege" +
- s" on [$namespace1/$table1/id]"))
- } else {
+ withSingleCallEnabled {
+ val e7 = intercept[AccessControlException](
+ doAs(
+ someone,
+ sql(s"CACHE TABLE $cacheTable1" +
+ s" AS select * from $catalogV2.$namespace1.$table1")))
assert(e7.getMessage.contains(s"does not have [select] privilege" +
- s" on [$catalogV2.$namespace1/$table1]"))
+ s" on
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
}
}