This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new b2210d928 [KYUUBI #3666][Subtask] Support {OWNER} variable for queries 
run on CatalogV2
b2210d928 is described below

commit b2210d928c953d87c5fbc9faee78d7c69c8d0d26
Author: zhouyifan279 <[email protected]>
AuthorDate: Fri Oct 21 21:31:19 2022 +0800

    [KYUUBI #3666][Subtask] Support {OWNER} variable for queries run on 
CatalogV2
    
    ### _Why are the changes needed?_
    Subtask of #3607
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3674 from zhouyifan279/3666.
    
    Closes #3666
    
    09f85adb [zhouyifan279] Run test on Spark 3.2 or greater
    e33551cd [zhouyifan279] [KYUUBI #3666] Support {OWNER} variable for queries 
run on CatalogV2
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../plugin/spark/authz/PrivilegesBuilder.scala     | 14 +++++++---
 .../plugin/spark/authz/util/AuthZUtils.scala       |  8 ++++++
 .../kyuubi/plugin/spark/authz/v2Commands.scala     |  9 ++++++-
 .../src/test/resources/sparkSql_hive_jenkins.json  |  3 ++-
 .../IcebergCatalogRangerSparkExtensionSuite.scala  | 30 +++++++++++++++++++++-
 5 files changed, 57 insertions(+), 7 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index fe636e8d5..56a32be74 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -121,13 +121,16 @@ object PrivilegesBuilder {
       }
     }
 
-    def mergeProjectionV2Table(table: Identifier, plan: LogicalPlan): Unit = {
+    def mergeProjectionV2Table(
+        table: Identifier,
+        plan: LogicalPlan,
+        owner: Option[String] = None): Unit = {
       if (projectionList.isEmpty) {
-        privilegeObjects += v2TablePrivileges(table, plan.output.map(_.name))
+        privilegeObjects += v2TablePrivileges(table, plan.output.map(_.name), 
owner)
       } else {
         val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
           .filter(plan.outputSet.contains).map(_.name).distinct
-        privilegeObjects += v2TablePrivileges(table, cols)
+        privilegeObjects += v2TablePrivileges(table, cols, owner)
       }
     }
 
@@ -166,7 +169,10 @@ object PrivilegesBuilder {
       case datasourceV2Relation if 
hasResolvedDatasourceV2Table(datasourceV2Relation) =>
         val tableIdent = getDatasourceV2Identifier(datasourceV2Relation)
         if (tableIdent.isDefined) {
-          mergeProjectionV2Table(tableIdent.get, plan)
+          mergeProjectionV2Table(
+            tableIdent.get,
+            plan,
+            getDatasourceV2TableOwner(datasourceV2Relation))
         }
 
       case u if u.nodeName == "UnresolvedRelation" =>
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 270acb7d5..373706bdf 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.plugin.spark.authz.util
 
+import java.util
+
 import scala.util.{Failure, Success, Try}
 
 import org.apache.hadoop.security.UserGroupInformation
@@ -105,6 +107,12 @@ private[authz] object AuthZUtils {
     getFieldVal[Option[Identifier]](plan, "identifier")
   }
 
+  def getDatasourceV2TableOwner(plan: LogicalPlan): Option[String] = {
+    val table = getFieldVal[AnyRef](plan, "table")
+    val properties = invoke(table, "properties").asInstanceOf[util.Map[String, 
String]]
+    Option(properties.get("owner"))
+  }
+
   def getTableIdentifierFromV2Identifier(id: Identifier): TableIdentifier = {
     TableIdentifier(id.name(), Some(quote(id.namespace())))
   }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
index 8ed6ca161..e7e647b53 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
@@ -151,8 +151,15 @@ object v2Commands extends Enumeration {
   def v2TablePrivileges(
       table: Identifier,
       columns: Seq[String] = Nil,
+      owner: Option[String] = None,
       actionType: PrivilegeObjectActionType = 
PrivilegeObjectActionType.OTHER): PrivilegeObject = {
-    PrivilegeObject(TABLE_OR_VIEW, actionType, quote(table.namespace()), 
table.name(), columns)
+    PrivilegeObject(
+      TABLE_OR_VIEW,
+      actionType,
+      quote(table.namespace()),
+      table.name(),
+      columns,
+      owner)
   }
 
   // namespace commands
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
 
b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
index 64ffa09c8..b5b069c46 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json
@@ -279,7 +279,8 @@
         "database": {
           "values": [
             "default",
-            "spark_catalog"
+            "spark_catalog",
+            "iceberg_ns"
           ],
           "isExcludes": false,
           "isRecursive": false
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index 531af4d3e..616c18953 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -17,6 +17,7 @@
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
 // scalastyle:off
+import scala.util.Try
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.plugin.spark.authz.AccessControlException
@@ -33,7 +34,7 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
     else ""
 
   val catalogV2 = "local"
-  val namespace1 = "ns1"
+  val namespace1 = "iceberg_ns"
   val table1 = "table1"
   val outputTable1 = "outputTable1"
 
@@ -142,4 +143,31 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
 
     doAs("admin", sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE 
id=2"))
   }
+
+  test("[KYUUBI #3666] Support {OWNER} variable for queries run on CatalogV2") 
{
+    assume(isSparkV32OrGreater)
+
+    val table = "owner_variable"
+    val select = s"SELECT key FROM $catalogV2.$namespace1.$table"
+
+    withCleanTmpResources(Seq((s"$catalogV2.$namespace1.$table", "table"))) {
+      doAs(
+        defaultTableOwner,
+        assert(Try {
+          sql(s"CREATE TABLE $catalogV2.$namespace1.$table (key int, value 
int) USING iceberg")
+        }.isSuccess))
+
+      doAs(
+        defaultTableOwner,
+        assert(Try {
+          sql(select).collect()
+        }.isSuccess))
+
+      doAs(
+        "create_only_user", {
+          val e = intercept[AccessControlException](sql(select).collect())
+          assert(e.getMessage === errorMessage("select", 
s"$namespace1/$table/key"))
+        })
+    }
+  }
 }

Reply via email to