This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23f32cf15 [KYUUBI #5690][AUTHZ] Support insert into/overwrite 
path-based table for Delta Lake in Authz
23f32cf15 is described below

commit 23f32cf1512fe28839e2af450a5f733fa257cbeb
Author: zml1206 <[email protected]>
AuthorDate: Thu Nov 16 10:12:43 2023 +0800

    [KYUUBI #5690][AUTHZ] Support insert into/overwrite path-based table for 
Delta Lake in Authz
    
    ### _Why are the changes needed?_
    To close #5690 .
    Support insert into/overwrite path-based table for Delta Lake in Authz 
plugin.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No.
    
    Closes #5691 from zml1206/KYUUBI-5690.
    
    Closes #5690
    
    e1506ca93 [zml1206] update
    704ce2c6e [zml1206] fix
    bf3471109 [zml1206] Support insert into/overwrite path-based table for 
Delta Lake in Authz
    
    Authored-by: zml1206 <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 ...he.kyuubi.plugin.spark.authz.serde.URIExtractor |  1 +
 .../src/main/resources/table_command_spec.json     | 18 ++++++++++---
 .../plugin/spark/authz/serde/tableExtractors.scala | 15 ++++++-----
 .../plugin/spark/authz/serde/uriExtractors.scala   | 16 +++++++++++-
 .../plugin/spark/authz/gen/TableCommands.scala     |  6 +++--
 .../DeltaCatalogRangerSparkExtensionSuite.scala    | 30 ++++++++++++++++++++++
 6 files changed, 73 insertions(+), 13 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
index fb869f602..60f761233 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
@@ -18,6 +18,7 @@
 org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 57fa3b267..4442e6868 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -79,7 +79,11 @@
     "fieldName" : "query",
     "fieldExtractor" : "LogicalPlanQueryExtractor"
   } ],
-  "uriDescs" : [ ]
+  "uriDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "DataSourceV2RelationURIExtractor",
+    "isInput" : false
+  } ]
 }, {
   "classname" : "org.apache.spark.sql.catalyst.plans.logical.CacheTable",
   "tableDescs" : [ ],
@@ -365,7 +369,11 @@
     "fieldName" : "query",
     "fieldExtractor" : "LogicalPlanQueryExtractor"
   } ],
-  "uriDescs" : [ ]
+  "uriDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "DataSourceV2RelationURIExtractor",
+    "isInput" : false
+  } ]
 }, {
   "classname" : 
"org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic",
   "tableDescs" : [ {
@@ -387,7 +395,11 @@
     "fieldName" : "query",
     "fieldExtractor" : "LogicalPlanQueryExtractor"
   } ],
-  "uriDescs" : [ ]
+  "uriDescs" : [ {
+    "fieldName" : "table",
+    "fieldExtractor" : "DataSourceV2RelationURIExtractor",
+    "isInput" : false
+  } ]
 }, {
   "classname" : "org.apache.spark.sql.catalyst.plans.logical.RefreshTable",
   "tableDescs" : [ {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index ce595c66b..82448f9cd 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -186,18 +187,18 @@ class ExpressionSeqTableExtractor extends TableExtractor {
 class DataSourceV2RelationTableExtractor extends TableExtractor {
   override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
     val plan = v1.asInstanceOf[LogicalPlan]
-    val maybeV2Relation = plan.find(_.getClass.getSimpleName == 
"DataSourceV2Relation")
-    maybeV2Relation match {
-      case None => None
-      case Some(v2Relation) =>
-        val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation, 
"catalog")
-        val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin =>
+    plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
+      case v2Relation: DataSourceV2Relation
+          if v2Relation.identifier == None ||
+            !isPathIdentifier(v2Relation.identifier.get.name(), spark) =>
+        val maybeCatalog = v2Relation.catalog.flatMap(catalogPlugin =>
           lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin))
-        lookupExtractor[TableTableExtractor].apply(spark, 
invokeAs[AnyRef](v2Relation, "table"))
+        lookupExtractor[TableTableExtractor].apply(spark, v2Relation.table)
           .map { table =>
             val maybeOwner = TableExtractor.getOwner(v2Relation)
             table.copy(catalog = maybeCatalog, owner = maybeOwner)
           }
+      case _ => None
     }
   }
 }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
index 7d4833516..eff842746 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
@@ -19,9 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.serde
 
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable}
-import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.connector.catalog.Identifier
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 
 import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._
 import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
@@ -115,3 +116,16 @@ class SubqueryAliasURIExtractor extends URIExtractor {
       Seq(identifier.name).map(Uri)
   }
 }
+
+class DataSourceV2RelationURIExtractor extends URIExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
+    val plan = v1.asInstanceOf[LogicalPlan]
+    plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
+      case v2Relation: DataSourceV2Relation
+          if v2Relation.identifier != None &&
+            isPathIdentifier(v2Relation.identifier.get.name, spark) =>
+        Seq(v2Relation.identifier.get.name).map(Uri)
+      case _ => Nil
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index 4d7dc2ac5..ee0345a8c 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -270,7 +270,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] 
{
         "table",
         classOf[DataSourceV2RelationTableExtractor],
         actionTypeDesc = Some(actionTypeDesc))
-    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
+    val uriDescs = Seq(UriDesc("table", 
classOf[DataSourceV2RelationURIExtractor]))
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), 
uriDescs = uriDescs)
   }
 
   val ReplaceData = {
@@ -308,7 +309,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec] 
{
         "table",
         classOf[DataSourceV2RelationTableExtractor],
         actionTypeDesc = Some(actionTypeDesc))
-    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
+    val uriDescs = Seq(UriDesc("table", 
classOf[DataSourceV2RelationURIExtractor]))
+    TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc), 
uriDescs = uriDescs)
   }
 
   val OverwritePartitionsDynamic = {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 514b7970b..331bd380d 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -357,6 +357,36 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
       doAs(admin, sql(updateTableSql))
     })
   }
+
+  test("insert path-based table") {
+    withSingleCallEnabled {
+      withCleanTmpResources(Seq((s"$namespace1.$table2", "table"), 
(s"$namespace1", "database"))) {
+        doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+        doAs(admin, sql(createTableSql(namespace1, table2)))
+        withTempDir(path => {
+          doAs(admin, sql(createPathBasedTableSql(path)))
+          // insert into
+          val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM 
$namespace1.$table2"
+          interceptContains[AccessControlException](
+            doAs(someone, sql(insertIntoSql)))(
+            s"does not have [select] privilege on [$namespace1/$table2/id," +
+              s"$namespace1/$table2/name,$namespace1/$table2/gender," +
+              s"$namespace1/$table2/birthDate], [write] privilege on [[$path, 
$path/]]")
+          doAs(admin, sql(insertIntoSql))
+
+          // insert overwrite
+          val insertOverwriteSql =
+            s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
+          interceptContains[AccessControlException](
+            doAs(someone, sql(insertOverwriteSql)))(
+            s"does not have [select] privilege on [$namespace1/$table2/id," +
+              s"$namespace1/$table2/name,$namespace1/$table2/gender," +
+              s"$namespace1/$table2/birthDate], [write] privilege on [[$path, 
$path/]]")
+          doAs(admin, sql(insertOverwriteSql))
+        })
+      }
+    }
+  }
 }
 
 object DeltaCatalogRangerSparkExtensionSuite {

Reply via email to