This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new b2210d928 [KYUUBI #3666][Subtask] Support {OWNER} variable for queries
run on CatalogV2
b2210d928 is described below
commit b2210d928c953d87c5fbc9faee78d7c69c8d0d26
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Oct 21 21:31:19 2022 +0800
[KYUUBI #3666][Subtask] Support {OWNER} variable for queries run on
CatalogV2
### _Why are the changes needed?_
Subtask of #3607
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3674 from zhouyifan279/3666.
Closes #3666
09f85adb [zhouyifan279] Run test on Spark 3.2 or greater
e33551cd [zhouyifan279] [KYUUBI #3666] Support {OWNER} variable for queries
run on CatalogV2
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../plugin/spark/authz/PrivilegesBuilder.scala | 14 +++++++---
.../plugin/spark/authz/util/AuthZUtils.scala | 8 ++++++
.../kyuubi/plugin/spark/authz/v2Commands.scala | 9 ++++++-
.../src/test/resources/sparkSql_hive_jenkins.json | 3 ++-
.../IcebergCatalogRangerSparkExtensionSuite.scala | 30 +++++++++++++++++++++-
5 files changed, 57 insertions(+), 7 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index fe636e8d5..56a32be74 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -121,13 +121,16 @@ object PrivilegesBuilder {
}
}
- def mergeProjectionV2Table(table: Identifier, plan: LogicalPlan): Unit = {
+ def mergeProjectionV2Table(
+ table: Identifier,
+ plan: LogicalPlan,
+ owner: Option[String] = None): Unit = {
if (projectionList.isEmpty) {
- privilegeObjects += v2TablePrivileges(table, plan.output.map(_.name))
+ privilegeObjects += v2TablePrivileges(table, plan.output.map(_.name),
owner)
} else {
val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
.filter(plan.outputSet.contains).map(_.name).distinct
- privilegeObjects += v2TablePrivileges(table, cols)
+ privilegeObjects += v2TablePrivileges(table, cols, owner)
}
}
@@ -166,7 +169,10 @@ object PrivilegesBuilder {
case datasourceV2Relation if
hasResolvedDatasourceV2Table(datasourceV2Relation) =>
val tableIdent = getDatasourceV2Identifier(datasourceV2Relation)
if (tableIdent.isDefined) {
- mergeProjectionV2Table(tableIdent.get, plan)
+ mergeProjectionV2Table(
+ tableIdent.get,
+ plan,
+ getDatasourceV2TableOwner(datasourceV2Relation))
}
case u if u.nodeName == "UnresolvedRelation" =>
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 270acb7d5..373706bdf 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.plugin.spark.authz.util
+import java.util
+
import scala.util.{Failure, Success, Try}
import org.apache.hadoop.security.UserGroupInformation
@@ -105,6 +107,12 @@ private[authz] object AuthZUtils {
getFieldVal[Option[Identifier]](plan, "identifier")
}
+ def getDatasourceV2TableOwner(plan: LogicalPlan): Option[String] = {
+ val table = getFieldVal[AnyRef](plan, "table")
+ val properties = invoke(table, "properties").asInstanceOf[util.Map[String,
String]]
+ Option(properties.get("owner"))
+ }
+
def getTableIdentifierFromV2Identifier(id: Identifier): TableIdentifier = {
TableIdentifier(id.name(), Some(quote(id.namespace())))
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
index 8ed6ca161..e7e647b53 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
@@ -151,8 +151,15 @@ object v2Commands extends Enumeration {
def v2TablePrivileges(
table: Identifier,
columns: Seq[String] = Nil,
+ owner: Option[String] = None,
actionType: PrivilegeObjectActionType =
PrivilegeObjectActionType.OTHER): PrivilegeObject = {
- PrivilegeObject(TABLE_OR_VIEW, actionType, quote(table.namespace()),
table.name(), columns)
+ PrivilegeObject(
+ TABLE_OR_VIEW,
+ actionType,
+ quote(table.namespace()),
+ table.name(),
+ columns,
+ owner)
}
// namespace commands
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index 64ffa09c8..b5b069c46 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++
b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -279,7 +279,8 @@
"database": {
"values": [
"default",
- "spark_catalog"
+ "spark_catalog",
+ "iceberg_ns"
],
"isExcludes": false,
"isRecursive": false
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index 531af4d3e..616c18953 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
// scalastyle:off
+import scala.util.Try
import org.apache.kyuubi.Utils
import org.apache.kyuubi.plugin.spark.authz.AccessControlException
@@ -33,7 +34,7 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
else ""
val catalogV2 = "local"
- val namespace1 = "ns1"
+ val namespace1 = "iceberg_ns"
val table1 = "table1"
val outputTable1 = "outputTable1"
@@ -142,4 +143,31 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
doAs("admin", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE
id=2"))
}
+
+ test("[KYUUBI #3666] Support {OWNER} variable for queries run on CatalogV2")
{
+ assume(isSparkV32OrGreater)
+
+ val table = "owner_variable"
+ val select = s"SELECT key FROM $catalogV2.$namespace1.$table"
+
+ withCleanTmpResources(Seq((s"$catalogV2.$namespace1.$table", "table"))) {
+ doAs(
+ defaultTableOwner,
+ assert(Try {
+ sql(s"CREATE TABLE $catalogV2.$namespace1.$table (key int, value
int) USING iceberg")
+ }.isSuccess))
+
+ doAs(
+ defaultTableOwner,
+ assert(Try {
+ sql(select).collect()
+ }.isSuccess))
+
+ doAs(
+ "create_only_user", {
+ val e = intercept[AccessControlException](sql(select).collect())
+ assert(e.getMessage === errorMessage("select",
s"$namespace1/$table/key"))
+ })
+ }
+ }
}