This is an automated email from the ASF dual-hosted git repository.

yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 97fd5b79a [KYUUBI #5565][AUTHZ] Support Delete/Insert/Update table 
command for Delta Lake
97fd5b79a is described below

commit 97fd5b79a5c72cb82592b7fef6653d4545ae9ad4
Author: zml1206 <[email protected]>
AuthorDate: Fri Nov 3 22:08:26 2023 +0800

    [KYUUBI #5565][AUTHZ] Support Delete/Insert/Update table command for Delta 
Lake
    
    ### _Why are the changes needed?_
    To close #5565.
    - Support Delete from table command for Delta Lake in Authz.
    - Support Insert table command for Delta Lake in Authz.
    - Support Update table command for Delta Lake in Authz.
    - Reduce the fields of `createTableSql`.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?_
    No.
    
    Closes #5596 from zml1206/KYUUBI-5565-2.
    
    Closes #5565
    
    452d7d8d4 [zml1206] ut add do as with admin
    257200510 [zml1206] improve SubqueryAliasTableExtractor
    e2a3fe00f [zml1206] Support Delete/Insert/Update table command for Delta 
Lake
    
    Authored-by: zml1206 <[email protected]>
    Signed-off-by: Kent Yao <[email protected]>
---
 ....kyuubi.plugin.spark.authz.serde.TableExtractor |   1 +
 .../src/main/resources/table_command_spec.json     |  64 +++++++++++
 .../plugin/spark/authz/serde/tableExtractors.scala |  14 +++
 .../plugin/spark/authz/gen/DeltaCommands.scala     |  48 ++++++++
 .../spark/authz/gen/JsonSpecFileGenerator.scala    |   2 +-
 .../DeltaCatalogRangerSparkExtensionSuite.scala    | 122 +++++++++++++--------
 6 files changed, 205 insertions(+), 46 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index dc35a8f51..27775efd5 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -29,5 +29,6 @@ 
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 5c2dcd09b..9a1c0c34c 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1997,4 +1997,68 @@
   "opType" : "QUERY",
   "queryDescs" : [ ],
   "uriDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.delta.commands.DeleteCommand",
+  "tableDescs" : [ {
+    "fieldName" : "catalogTable",
+    "fieldExtractor" : "CatalogTableOptionTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  }, {
+    "fieldName" : "target",
+    "fieldExtractor" : "SubqueryAliasTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
+}, {
+  "classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
+  "tableDescs" : [ {
+    "fieldName" : "catalogTable",
+    "fieldExtractor" : "CatalogTableOptionTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  }, {
+    "fieldName" : "target",
+    "fieldExtractor" : "SubqueryAliasTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : {
+      "fieldName" : null,
+      "fieldExtractor" : null,
+      "actionType" : "UPDATE"
+    },
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "QUERY",
+  "queryDescs" : [ ],
+  "uriDescs" : [ ]
 } ]
\ No newline at end of file
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index a54b58c33..7a5be4cac 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -234,6 +234,20 @@ class ResolvedIdentifierTableExtractor extends 
TableExtractor {
   }
 }
 
+/**
+ * org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+ */
+class SubqueryAliasTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    v1.asInstanceOf[SubqueryAlias] match {
+      case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
+        lookupExtractor[StringTableExtractor].apply(spark, 
identifier.toString())
+      case SubqueryAlias(identifier, _) =>
+        lookupExtractor[StringTableExtractor].apply(spark, 
identifier.toString())
+    }
+  }
+}
+
 /**
  * org.apache.spark.sql.connector.catalog.Table
  */
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
new file mode 100644
index 000000000..e8cce67ab
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.gen
+
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+object DeltaCommands extends CommandSpecs[TableCommandSpec] {
+
+  val DeleteCommand = {
+    val cmd = "org.apache.spark.sql.delta.commands.DeleteCommand"
+    val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+    val tableDesc = TableDesc(
+      "catalogTable",
+      classOf[CatalogTableOptionTableExtractor],
+      actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(tableDesc))
+    val targetDesc = TableDesc(
+      "target",
+      classOf[SubqueryAliasTableExtractor],
+      actionTypeDesc = Some(actionTypeDesc))
+    TableCommandSpec(cmd, Seq(tableDesc, targetDesc))
+  }
+
+  val UpdateCommand = {
+    val cmd = "org.apache.spark.sql.delta.commands.UpdateCommand"
+    DeleteCommand.copy(classname = cmd)
+  }
+
+  override def specs: Seq[TableCommandSpec] = Seq(
+    DeleteCommand,
+    UpdateCommand)
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
index 07a8e2852..5fb4ace10 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
@@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite {
   // scalastyle:on
   test("check spec json files") {
     writeCommandSpecJson("database", Seq(DatabaseCommands))
-    writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, 
HudiCommands))
+    writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands, 
HudiCommands, DeltaCommands))
     writeCommandSpecJson("function", Seq(FunctionCommands))
     writeCommandSpecJson("scan", Seq(Scans))
   }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 405c5512d..4fc73adce 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -38,6 +38,18 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
   val table1 = "table1_delta"
   val table2 = "table2_delta"
 
+  def createTableSql(namespace: String, table: String): String =
+    s"""
+       |CREATE TABLE IF NOT EXISTS $namespace.$table (
+       |  id INT,
+       |  name STRING,
+       |  gender STRING,
+       |  birthDate TIMESTAMP
+       |)
+       |USING DELTA
+       |PARTITIONED BY (gender)
+       |""".stripMargin
+
   override def withFixture(test: NoArgTest): Outcome = {
     test()
   }
@@ -66,13 +78,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
         s"""
            |CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
            |  id INT,
-           |  firstName STRING,
-           |  middleName STRING,
-           |  lastName STRING,
+           |  name STRING,
            |  gender STRING,
-           |  birthDate TIMESTAMP,
-           |  ssn STRING,
-           |  salary INT
+           |  birthDate TIMESTAMP
            |) USING DELTA
            |""".stripMargin
       interceptContains[AccessControlException] {
@@ -80,21 +88,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
       }(s"does not have [create] privilege on [$namespace1/$table1]")
       doAs(admin, sql(createNonPartitionTableSql))
 
-      val createPartitionTableSql =
-        s"""
-           |CREATE TABLE IF NOT EXISTS $namespace1.$table2 (
-           |  id INT,
-           |  firstName STRING,
-           |  middleName STRING,
-           |  lastName STRING,
-           |  gender STRING,
-           |  birthDate TIMESTAMP,
-           |  ssn STRING,
-           |  salary INT
-           |)
-           |USING DELTA
-           |PARTITIONED BY (gender)
-           |""".stripMargin
+      val createPartitionTableSql = createTableSql(namespace1, table2)
       interceptContains[AccessControlException] {
         doAs(someone, sql(createPartitionTableSql))
       }(s"does not have [create] privilege on [$namespace1/$table2]")
@@ -109,13 +103,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
         s"""
            |CREATE OR REPLACE TABLE $namespace1.$table1 (
            |  id INT,
-           |  firstName STRING,
-           |  middleName STRING,
-           |  lastName STRING,
+           |  name STRING,
            |  gender STRING,
-           |  birthDate TIMESTAMP,
-           |  ssn STRING,
-           |  salary INT
+           |  birthDate TIMESTAMP
            |) USING DELTA
            |""".stripMargin
       interceptContains[AccessControlException] {
@@ -128,23 +118,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
   test("alter table") {
     withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), 
(s"$namespace1", "database"))) {
       doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
-      doAs(
-        admin,
-        sql(
-          s"""
-             |CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
-             |  id INT,
-             |  firstName STRING,
-             |  middleName STRING,
-             |  lastName STRING,
-             |  gender STRING,
-             |  birthDate TIMESTAMP,
-             |  ssn STRING,
-             |  salary INT
-             |)
-             |USING DELTA
-             |PARTITIONED BY (gender)
-             |""".stripMargin))
+      doAs(admin, sql(createTableSql(namespace1, table1)))
 
       // add columns
       interceptContains[AccessControlException](
@@ -164,7 +138,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
         doAs(
           someone,
           sql(s"ALTER TABLE $namespace1.$table1" +
-            s" REPLACE COLUMNS (id INT, firstName STRING)")))(
+            s" REPLACE COLUMNS (id INT, name STRING)")))(
         s"does not have [alter] privilege on [$namespace1/$table1]")
 
       // rename column
@@ -189,6 +163,64 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
         s"does not have [alter] privilege on [$namespace1/$table1]")
     }
   }
+
+  test("delete from table") {
+    withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), 
(s"$namespace1", "database"))) {
+      doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+      doAs(admin, sql(createTableSql(namespace1, table1)))
+      val deleteFromTableSql = s"DELETE FROM $namespace1.$table1 WHERE 
birthDate < '1955-01-01'"
+      interceptContains[AccessControlException](
+        doAs(someone, sql(deleteFromTableSql)))(
+        s"does not have [update] privilege on [$namespace1/$table1]")
+      doAs(admin, sql(deleteFromTableSql))
+    }
+  }
+
+  test("insert table") {
+    withSingleCallEnabled {
+      withCleanTmpResources(Seq(
+        (s"$namespace1.$table1", "table"),
+        (s"$namespace1.$table2", "table"),
+        (s"$namespace1", "database"))) {
+        doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+        doAs(admin, sql(createTableSql(namespace1, table1)))
+        doAs(admin, sql(createTableSql(namespace1, table2)))
+
+        // insert into
+        val insertIntoSql = s"INSERT INTO $namespace1.$table1" +
+          s" SELECT * FROM $namespace1.$table2"
+        interceptContains[AccessControlException](
+          doAs(someone, sql(insertIntoSql)))(
+          s"does not have [select] privilege on 
[$namespace1/$table2/id,$namespace1/$table2/name," +
+            s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+            s" [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(insertIntoSql))
+
+        // insert overwrite
+        val insertOverwriteSql = s"INSERT OVERWRITE $namespace1.$table1" +
+          s" SELECT * FROM $namespace1.$table2"
+        interceptContains[AccessControlException](
+          doAs(someone, sql(insertOverwriteSql)))(
+          s"does not have [select] privilege on 
[$namespace1/$table2/id,$namespace1/$table2/name," +
+            s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+            s" [update] privilege on [$namespace1/$table1]")
+        doAs(admin, sql(insertOverwriteSql))
+      }
+    }
+  }
+
+  test("update table") {
+    withCleanTmpResources(Seq((s"$namespace1.$table1", "table"), 
(s"$namespace1", "database"))) {
+      doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+      doAs(admin, sql(createTableSql(namespace1, table1)))
+      val updateTableSql = s"UPDATE $namespace1.$table1" +
+        s" SET gender = 'Female' WHERE gender = 'F'"
+      interceptContains[AccessControlException](
+        doAs(someone, sql(updateTableSql)))(
+        s"does not have [update] privilege on [$namespace1/$table1]")
+      doAs(admin, sql(updateTableSql))
+    }
+  }
 }
 
 object DeltaCatalogRangerSparkExtensionSuite {

Reply via email to