This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 97fd5b79a [KYUUBI #5565][AUTHZ] Support Delete/Insert/Update table
command for Delta Lake
97fd5b79a is described below
commit 97fd5b79a5c72cb82592b7fef6653d4545ae9ad4
Author: zml1206 <[email protected]>
AuthorDate: Fri Nov 3 22:08:26 2023 +0800
[KYUUBI #5565][AUTHZ] Support Delete/Insert/Update table command for Delta
Lake
### _Why are the changes needed?_
To close #5565.
- Support Delete from table command for Delta Lake in Authz.
- Support Insert table command for Delta Lake in Authz.
- Support Update table command for Delta Lake in Authz.
- Reduce the fields of `createTableSql`.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5596 from zml1206/KYUUBI-5565-2.
Closes #5565
452d7d8d4 [zml1206] ut add do as with admin
257200510 [zml1206] improve SubqueryAliasTableExtractor
e2a3fe00f [zml1206] Support Delete/Insert/Update table command for Delta
Lake
Authored-by: zml1206 <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
....kyuubi.plugin.spark.authz.serde.TableExtractor | 1 +
.../src/main/resources/table_command_spec.json | 64 +++++++++++
.../plugin/spark/authz/serde/tableExtractors.scala | 14 +++
.../plugin/spark/authz/gen/DeltaCommands.scala | 48 ++++++++
.../spark/authz/gen/JsonSpecFileGenerator.scala | 2 +-
.../DeltaCatalogRangerSparkExtensionSuite.scala | 122 +++++++++++++--------
6 files changed, 205 insertions(+), 46 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index dc35a8f51..27775efd5 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -29,5 +29,6 @@
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.SubqueryAliasTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 5c2dcd09b..9a1c0c34c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1997,4 +1997,68 @@
"opType" : "QUERY",
"queryDescs" : [ ],
"uriDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.delta.commands.DeleteCommand",
+ "tableDescs" : [ {
+ "fieldName" : "catalogTable",
+ "fieldExtractor" : "CatalogTableOptionTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ }, {
+ "fieldName" : "target",
+ "fieldExtractor" : "SubqueryAliasTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.delta.commands.UpdateCommand",
+ "tableDescs" : [ {
+ "fieldName" : "catalogTable",
+ "fieldExtractor" : "CatalogTableOptionTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ }, {
+ "fieldName" : "target",
+ "fieldExtractor" : "SubqueryAliasTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : {
+ "fieldName" : null,
+ "fieldExtractor" : null,
+ "actionType" : "UPDATE"
+ },
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "QUERY",
+ "queryDescs" : [ ],
+ "uriDescs" : [ ]
} ]
\ No newline at end of file
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index a54b58c33..7a5be4cac 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -234,6 +234,20 @@ class ResolvedIdentifierTableExtractor extends
TableExtractor {
}
}
+/**
+ * org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+ */
+class SubqueryAliasTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ v1.asInstanceOf[SubqueryAlias] match {
+ case SubqueryAlias(_, SubqueryAlias(identifier, _)) =>
+ lookupExtractor[StringTableExtractor].apply(spark,
identifier.toString())
+ case SubqueryAlias(identifier, _) =>
+ lookupExtractor[StringTableExtractor].apply(spark,
identifier.toString())
+ }
+ }
+}
+
/**
* org.apache.spark.sql.connector.catalog.Table
*/
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
new file mode 100644
index 000000000..e8cce67ab
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/DeltaCommands.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.gen
+
+import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
+import org.apache.kyuubi.plugin.spark.authz.serde._
+
+object DeltaCommands extends CommandSpecs[TableCommandSpec] {
+
+ val DeleteCommand = {
+ val cmd = "org.apache.spark.sql.delta.commands.DeleteCommand"
+ val actionTypeDesc = ActionTypeDesc(actionType = Some(UPDATE))
+ val tableDesc = TableDesc(
+ "catalogTable",
+ classOf[CatalogTableOptionTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(tableDesc))
+ val targetDesc = TableDesc(
+ "target",
+ classOf[SubqueryAliasTableExtractor],
+ actionTypeDesc = Some(actionTypeDesc))
+ TableCommandSpec(cmd, Seq(tableDesc, targetDesc))
+ }
+
+ val UpdateCommand = {
+ val cmd = "org.apache.spark.sql.delta.commands.UpdateCommand"
+ DeleteCommand.copy(classname = cmd)
+ }
+
+ override def specs: Seq[TableCommandSpec] = Seq(
+ DeleteCommand,
+ UpdateCommand)
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
index 07a8e2852..5fb4ace10 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
@@ -44,7 +44,7 @@ class JsonSpecFileGenerator extends AnyFunSuite {
// scalastyle:on
test("check spec json files") {
writeCommandSpecJson("database", Seq(DatabaseCommands))
- writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands,
HudiCommands))
+ writeCommandSpecJson("table", Seq(TableCommands, IcebergCommands,
HudiCommands, DeltaCommands))
writeCommandSpecJson("function", Seq(FunctionCommands))
writeCommandSpecJson("scan", Seq(Scans))
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 405c5512d..4fc73adce 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -38,6 +38,18 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
val table1 = "table1_delta"
val table2 = "table2_delta"
+ def createTableSql(namespace: String, table: String): String =
+ s"""
+ |CREATE TABLE IF NOT EXISTS $namespace.$table (
+ | id INT,
+ | name STRING,
+ | gender STRING,
+ | birthDate TIMESTAMP
+ |)
+ |USING DELTA
+ |PARTITIONED BY (gender)
+ |""".stripMargin
+
override def withFixture(test: NoArgTest): Outcome = {
test()
}
@@ -66,13 +78,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s"""
|CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
| id INT,
- | firstName STRING,
- | middleName STRING,
- | lastName STRING,
+ | name STRING,
| gender STRING,
- | birthDate TIMESTAMP,
- | ssn STRING,
- | salary INT
+ | birthDate TIMESTAMP
|) USING DELTA
|""".stripMargin
interceptContains[AccessControlException] {
@@ -80,21 +88,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
}(s"does not have [create] privilege on [$namespace1/$table1]")
doAs(admin, sql(createNonPartitionTableSql))
- val createPartitionTableSql =
- s"""
- |CREATE TABLE IF NOT EXISTS $namespace1.$table2 (
- | id INT,
- | firstName STRING,
- | middleName STRING,
- | lastName STRING,
- | gender STRING,
- | birthDate TIMESTAMP,
- | ssn STRING,
- | salary INT
- |)
- |USING DELTA
- |PARTITIONED BY (gender)
- |""".stripMargin
+ val createPartitionTableSql = createTableSql(namespace1, table2)
interceptContains[AccessControlException] {
doAs(someone, sql(createPartitionTableSql))
}(s"does not have [create] privilege on [$namespace1/$table2]")
@@ -109,13 +103,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s"""
|CREATE OR REPLACE TABLE $namespace1.$table1 (
| id INT,
- | firstName STRING,
- | middleName STRING,
- | lastName STRING,
+ | name STRING,
| gender STRING,
- | birthDate TIMESTAMP,
- | ssn STRING,
- | salary INT
+ | birthDate TIMESTAMP
|) USING DELTA
|""".stripMargin
interceptContains[AccessControlException] {
@@ -128,23 +118,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
test("alter table") {
withCleanTmpResources(Seq((s"$namespace1.$table1", "table"),
(s"$namespace1", "database"))) {
doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
- doAs(
- admin,
- sql(
- s"""
- |CREATE TABLE IF NOT EXISTS $namespace1.$table1 (
- | id INT,
- | firstName STRING,
- | middleName STRING,
- | lastName STRING,
- | gender STRING,
- | birthDate TIMESTAMP,
- | ssn STRING,
- | salary INT
- |)
- |USING DELTA
- |PARTITIONED BY (gender)
- |""".stripMargin))
+ doAs(admin, sql(createTableSql(namespace1, table1)))
// add columns
interceptContains[AccessControlException](
@@ -164,7 +138,7 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
doAs(
someone,
sql(s"ALTER TABLE $namespace1.$table1" +
- s" REPLACE COLUMNS (id INT, firstName STRING)")))(
+ s" REPLACE COLUMNS (id INT, name STRING)")))(
s"does not have [alter] privilege on [$namespace1/$table1]")
// rename column
@@ -189,6 +163,64 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
s"does not have [alter] privilege on [$namespace1/$table1]")
}
}
+
+ test("delete from table") {
+ withCleanTmpResources(Seq((s"$namespace1.$table1", "table"),
(s"$namespace1", "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(admin, sql(createTableSql(namespace1, table1)))
+ val deleteFromTableSql = s"DELETE FROM $namespace1.$table1 WHERE
birthDate < '1955-01-01'"
+ interceptContains[AccessControlException](
+ doAs(someone, sql(deleteFromTableSql)))(
+ s"does not have [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(deleteFromTableSql))
+ }
+ }
+
+ test("insert table") {
+ withSingleCallEnabled {
+ withCleanTmpResources(Seq(
+ (s"$namespace1.$table1", "table"),
+ (s"$namespace1.$table2", "table"),
+ (s"$namespace1", "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(admin, sql(createTableSql(namespace1, table1)))
+ doAs(admin, sql(createTableSql(namespace1, table2)))
+
+ // insert into
+ val insertIntoSql = s"INSERT INTO $namespace1.$table1" +
+ s" SELECT * FROM $namespace1.$table2"
+ interceptContains[AccessControlException](
+ doAs(someone, sql(insertIntoSql)))(
+ s"does not have [select] privilege on
[$namespace1/$table2/id,$namespace1/$table2/name," +
+ s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+ s" [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(insertIntoSql))
+
+ // insert overwrite
+ val insertOverwriteSql = s"INSERT OVERWRITE $namespace1.$table1" +
+ s" SELECT * FROM $namespace1.$table2"
+ interceptContains[AccessControlException](
+ doAs(someone, sql(insertOverwriteSql)))(
+ s"does not have [select] privilege on
[$namespace1/$table2/id,$namespace1/$table2/name," +
+ s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+ s" [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(insertOverwriteSql))
+ }
+ }
+ }
+
+ test("update table") {
+ withCleanTmpResources(Seq((s"$namespace1.$table1", "table"),
(s"$namespace1", "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(admin, sql(createTableSql(namespace1, table1)))
+ val updateTableSql = s"UPDATE $namespace1.$table1" +
+ s" SET gender = 'Female' WHERE gender = 'F'"
+ interceptContains[AccessControlException](
+ doAs(someone, sql(updateTableSql)))(
+ s"does not have [update] privilege on [$namespace1/$table1]")
+ doAs(admin, sql(updateTableSql))
+ }
+ }
}
object DeltaCatalogRangerSparkExtensionSuite {