This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new abadb05cc [KYUUBI #3608][Subtask] Get table owner for queries run on
SessionCatalog
abadb05cc is described below
commit abadb05cc434e2d440c76e02fe74dcf3839dd734
Author: zhouyifan279 <[email protected]>
AuthorDate: Thu Oct 20 16:54:03 2022 +0800
[KYUUBI #3608][Subtask] Get table owner for queries run on SessionCatalog
### _Why are the changes needed?_
Subtask of #3607
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3665 from zhouyifan279/3608.
Closes #3608
bcea68d8 [zhouyifan279] DataSourceV2Relation
a50f0a72 [zhouyifan279] Empty-Commit
5c8e2e7b [zhouyifan279] Fix typo
a2c3ab70 [zhouyifan279] Fix typo
31e730ea [zhouyifan279] Move test from RangerSparkExtensionSuite to
HiveCatalogRangerSparkExtensionSuite
35c627ec [zhouyifan279] Add `hasTableOwner` field in `SparkSessionProvider`
49a08e0d [zhouyifan279] Improve code style
2d7e6be1 [zhouyifan279] Run test "[KYUUBI #3607] Support {OWNER} variable
defined in Ranger Policy" if owner can be resolved
e975f36a [zhouyifan279] [Subtask] Get table owner from CatalogTable #3608
da9f52ce [zhouyifan279] [Subtask] Get table owner from CatalogTable #3608
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../plugin/spark/authz/PrivilegeObject.scala | 3 +-
.../plugin/spark/authz/PrivilegesBuilder.scala | 9 ++--
.../plugin/spark/authz/ranger/AccessResource.scala | 6 ++-
.../spark/authz/ranger/RuleAuthorization.scala | 8 +++-
.../src/test/resources/sparkSql_hive_jenkins.json | 53 +++++++++++++++++++++-
.../spark/authz/PrivilegesBuilderSuite.scala | 25 ++++++++++
.../plugin/spark/authz/SparkSessionProvider.scala | 14 +++++-
.../spark/authz/ranger/AccessResourceSuite.scala | 4 +-
.../authz/ranger/RangerSparkExtensionSuite.scala | 29 +++++++++++-
9 files changed, 140 insertions(+), 11 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegeObject.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegeObject.scala
index 24143eda1..053425be7 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegeObject.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegeObject.scala
@@ -43,4 +43,5 @@ case class PrivilegeObject(
actionType: PrivilegeObjectActionType,
dbname: String,
objectName: String,
- @Nonnull columns: Seq[String] = Nil)
+ @Nonnull columns: Seq[String] = Nil,
+ owner: Option[String] = None)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index b74e68940..89445d224 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -45,8 +45,9 @@ object PrivilegesBuilder {
private def tablePrivileges(
table: TableIdentifier,
columns: Seq[String] = Nil,
+ owner: Option[String] = None,
actionType: PrivilegeObjectActionType =
PrivilegeObjectActionType.OTHER): PrivilegeObject = {
- PrivilegeObject(TABLE_OR_VIEW, actionType, table.database.orNull,
table.table, columns)
+ PrivilegeObject(TABLE_OR_VIEW, actionType, table.database.orNull,
table.table, columns, owner)
}
private def functionPrivileges(
@@ -97,14 +98,16 @@ object PrivilegesBuilder {
conditionList: Seq[NamedExpression] = Nil): Unit = {
def mergeProjection(table: CatalogTable, plan: LogicalPlan): Unit = {
+ val tableOwner = Option(table.owner).filter(_.nonEmpty)
if (projectionList.isEmpty) {
privilegeObjects += tablePrivileges(
table.identifier,
- table.schema.fieldNames)
+ table.schema.fieldNames,
+ tableOwner)
} else {
val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
.filter(plan.outputSet.contains).map(_.name).distinct
- privilegeObjects += tablePrivileges(table.identifier, cols)
+ privilegeObjects += tablePrivileges(table.identifier, cols, tableOwner)
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
index b26b8bf82..93ade47b0 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala
@@ -42,7 +42,8 @@ object AccessResource {
objectType: ObjectType,
firstLevelResource: String,
secondLevelResource: String,
- thirdLevelResource: String): AccessResource = {
+ thirdLevelResource: String,
+ owner: Option[String] = None): AccessResource = {
val resource = new AccessResource(objectType)
resource.objectType match {
@@ -59,6 +60,7 @@ object AccessResource {
resource.setValue("table", secondLevelResource)
}
resource.setServiceDef(SparkRangerAdminPlugin.getServiceDef)
+ owner.foreach(resource.setOwnerUser)
resource
}
@@ -67,6 +69,6 @@ object AccessResource {
}
def apply(obj: PrivilegeObject, opType: OperationType): AccessResource = {
- apply(ObjectType(obj, opType), obj.dbname, obj.objectName,
obj.columns.mkString(","))
+ apply(ObjectType(obj, opType), obj.dbname, obj.objectName,
obj.columns.mkString(","), obj.owner)
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
index a5f7e53f9..2c946adbc 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
@@ -74,7 +74,13 @@ object RuleAuthorization {
resource.objectType match {
case ObjectType.COLUMN if resource.getColumns.nonEmpty =>
resource.getColumns.map { col =>
- val cr = AccessResource(COLUMN, resource.getDatabase,
resource.getTable, col)
+ val cr =
+ AccessResource(
+ COLUMN,
+ resource.getDatabase,
+ resource.getTable,
+ col,
+ Option(resource.getOwnerUser))
AccessRequest(cr, ugi, opType,
request.accessType).asInstanceOf[RangerAccessRequest]
}
case _ => Seq(request)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index 3e69607c2..64ffa09c8 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++
b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -345,7 +345,58 @@
],
"users": [
"bob",
- "perm_view_user"
+ "perm_view_user",
+ "{OWNER}"
+ ],
+ "groups": [],
+ "conditions": [],
+ "delegateAdmin": false
+ }, {
+ "accesses": [
+ {
+ "type": "select",
+ "isAllowed": false
+ },
+ {
+ "type": "update",
+ "isAllowed": false
+ },
+ {
+ "type": "create",
+ "isAllowed": true
+ },
+ {
+ "type": "drop",
+ "isAllowed": false
+ },
+ {
+ "type": "alter",
+ "isAllowed": false
+ },
+ {
+ "type": "index",
+ "isAllowed": false
+ },
+ {
+ "type": "lock",
+ "isAllowed": false
+ },
+ {
+ "type": "all",
+ "isAllowed": false
+ },
+ {
+ "type": "read",
+ "isAllowed": false
+ },
+ {
+ "type": "write",
+ "isAllowed": false
+ }
+ ],
+ "users": [
+ "default_table_owner",
+ "create_only_user"
],
"groups": [],
"conditions": [],
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index 5ed1a1905..0396b2293 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -60,12 +60,20 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.actionType === PrivilegeObjectActionType.OTHER)
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
assert(po.columns === cols)
+ checkTableOwner(po)
}
protected def checkColumns(query: String, cols: Seq[String]): Unit = {
checkColumns(sql(query).queryExecution.optimizedPlan, cols)
}
+ protected def checkTableOwner(po: PrivilegeObject): Unit = {
+ if (catalogImpl == "hive" && po.privilegeObjectType ===
PrivilegeObjectType.TABLE_OR_VIEW) {
+ assert(po.owner.isDefined)
+ assert(po.owner.get === defaultTableOwner)
+ }
+ }
+
protected val reusedDb: String = getClass.getSimpleName
protected val reusedTable: String = reusedDb + "." + getClass.getSimpleName
protected val reusedTableShort: String = reusedTable.split("\\.").last
@@ -452,6 +460,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.dbname equalsIgnoreCase reusedDb)
assert(po0.objectName equalsIgnoreCase reusedDb)
assert(po0.columns.head === "key")
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
} else {
@@ -475,6 +484,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.objectName equalsIgnoreCase reusedTable.split("\\.").last)
if (isSparkV32OrGreater) {
assert(po0.columns.head === "key")
+ checkTableOwner(po0)
} else {
assert(po0.columns.isEmpty)
}
@@ -498,6 +508,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po0.objectName equalsIgnoreCase reusedTable.split("\\.").last)
if (isSparkV32OrGreater) {
assert(po0.columns === Seq("key", "value"))
+ checkTableOwner(po0)
} else {
assert(po0.columns.isEmpty)
}
@@ -887,6 +898,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(po.dbname equalsIgnoreCase reusedDb)
assert(po.objectName equalsIgnoreCase reusedTableShort)
assert(po.columns.take(2) === Seq("key", "value"))
+ checkTableOwner(po)
}
test("Query: Projection") {
@@ -964,6 +976,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("value", "pid", "key"),
s"$reusedPartTable both 'key', 'value' and 'pid' should be
authenticated")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -990,6 +1003,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("value", "key", "pid"),
s"$reusedPartTable both 'key', 'value' and 'pid' should be
authenticated")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1019,6 +1033,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("key", "value"),
s"$reusedPartTable 'key' is the join key and 'pid' is omitted")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1047,6 +1062,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("key", "value"),
s"$reusedPartTable both 'key' and 'value' should be authenticated")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1076,6 +1092,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("key", "value"),
s"$reusedPartTable both 'key' and 'value' should be authenticated")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1101,6 +1118,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("key", "value"),
s"$reusedPartTable both 'key' and 'value' should be authenticated")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1126,6 +1144,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
assert(
po.columns === Seq("key", "value", "pid"),
s"$reusedPartTable both 'key', 'value' and 'pid' should be
authenticated")
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1230,6 +1249,7 @@ class InMemoryPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po0.dbname equalsIgnoreCase reusedDb)
assert(po0.objectName equalsIgnoreCase reusedTable.split("\\.").last)
assert(po0.columns === Seq("key", "value"))
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -1314,6 +1334,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po0.dbname equalsIgnoreCase reusedDb)
assert(po0.objectName equalsIgnoreCase reusedTable.split("\\.").last)
assert(po0.columns === Seq("key", "value"))
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -1378,6 +1399,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po0.dbname equalsIgnoreCase reusedDb)
assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
assert(po0.columns === Seq("key", "value", "pid"))
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
@@ -1423,6 +1445,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po.dbname equalsIgnoreCase reusedDb)
assert(po.objectName equalsIgnoreCase reusedTable.split("\\.").last)
assert(po.columns === Seq("key", "value"))
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = true)
assert(accessType === AccessType.SELECT)
}
@@ -1463,6 +1486,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po.dbname equalsIgnoreCase reusedDb)
assert(po.objectName equalsIgnoreCase reusedTable.split("\\.").last)
assert(po.columns === Seq("key", "value"))
+ checkTableOwner(po)
val accessType = ranger.AccessType(po, operationType, isInput = false)
assert(accessType === AccessType.SELECT)
}
@@ -1491,6 +1515,7 @@ class HiveCatalogPrivilegeBuilderSuite extends
PrivilegesBuilderSuite {
assert(po0.dbname equalsIgnoreCase reusedDb)
assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
assert(po0.columns === Seq("key", "value", "pid"))
+ checkTableOwner(po0)
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
assert(accessType0 === AccessType.SELECT)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
index 06ab61f38..fb2a3aa66 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/SparkSessionProvider.scala
@@ -18,7 +18,9 @@
package org.apache.kyuubi.plugin.spark.authz
import java.nio.file.Files
+import java.security.PrivilegedExceptionAction
+import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.sql.{DataFrame, SparkSession, SparkSessionExtensions}
import org.apache.kyuubi.Utils
@@ -35,13 +37,15 @@ trait SparkSessionProvider {
protected val extension: SparkSessionExtensions => Unit = _ => Unit
protected val sqlExtensions: String = ""
+ protected val defaultTableOwner = "default_table_owner"
+
protected lazy val spark: SparkSession = {
val metastore = {
val path = Utils.createTempDir(prefix = "hms")
Files.delete(path)
path
}
- SparkSession.builder()
+ val ret = SparkSession.builder()
.master("local")
.config("spark.ui.enabled", "false")
.config("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastore;create=true")
@@ -52,6 +56,14 @@ trait SparkSessionProvider {
.config("spark.sql.extensions", sqlExtensions)
.withExtensions(extension)
.getOrCreate()
+ if (catalogImpl == "hive") {
+ // Ensure HiveExternalCatalog.client.userName is defaultTableOwner
+ UserGroupInformation.createRemoteUser(defaultTableOwner).doAs(
+ new PrivilegedExceptionAction[Unit] {
+ override def run(): Unit = ret.catalog.listDatabases()
+ })
+ }
+ ret
}
protected val sql: String => DataFrame = spark.sql
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResourceSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResourceSuite.scala
index 25ced2008..f1259c65d 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResourceSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResourceSuite.scala
@@ -31,11 +31,13 @@ class AccessResourceSuite extends AnyFunSuite {
assert(resource.getColumn === null)
assert(resource.getColumns.isEmpty)
- val resource1 = AccessResource(DATABASE, null, "my_table_name",
"my_col_1,my_col_2")
+ val resource1 =
+ AccessResource(DATABASE, null, "my_table_name", "my_col_1,my_col_2",
Some("Bob"))
assert(resource1.getDatabase === null)
assert(resource1.getTable === null)
assert(resource1.getColumn === null)
assert(resource1.getColumns.isEmpty)
+ assert(resource1.getOwnerUser === "Bob")
val resource2 = AccessResource(FUNCTION, "my_db_name", "my_func_name",
null)
assert(resource2.getDatabase === "my_db_name")
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 8eac75ddb..a8d41a63c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -55,7 +55,7 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite
super.afterAll()
}
- private def errorMessage(
+ protected def errorMessage(
privilege: String,
resource: String = "default/src",
user: String = UserGroupInformation.getCurrentUser.getShortUserName):
String = {
@@ -865,4 +865,31 @@ class HiveCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}
}
}
+
+ test("[KYUUBI #3608] Support {OWNER} variable for queries") {
+ val db = "default"
+ val table = "owner_variable"
+
+ val select = s"SELECT key FROM $db.$table"
+
+ withCleanTmpResources(Seq((s"$db.$table", "table"))) {
+ doAs(
+ defaultTableOwner,
+ assert(Try {
+ sql(s"CREATE TABLE $db.$table (key int, value int) USING $format")
+ }.isSuccess))
+
+ doAs(
+ defaultTableOwner,
+ assert(Try {
+ sql(select).collect()
+ }.isSuccess))
+
+ doAs(
+ "create_only_user", {
+ val e = intercept[AccessControlException](sql(select).collect())
+ assert(e.getMessage === errorMessage("select", s"$db/$table/key"))
+ })
+ }
+ }
}