This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 40eed8c4a [KYUUBI #3675][Subtask] Get table owner for DDL/DML v2
commands
40eed8c4a is described below
commit 40eed8c4a7e7a68b47a94e1174df6426e10d5f41
Author: zhouyifan279 <[email protected]>
AuthorDate: Tue Oct 25 14:45:54 2022 +0800
[KYUUBI #3675][Subtask] Get table owner for DDL/DML v2 commands
### _Why are the changes needed?_
Subtask of #3607
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #3683 from zhouyifan279/3675.
Closes #3675
45b960a4 [zhouyifan279] [KYUUBI #3675][Subtask] Get table owner for DDL/DML
v2 commands
d1b4a35f [zhouyifan279] [KYUUBI #3675][Subtask] Get table owner for DDL/DML
v2 commands
0ef3b986 [zhouyifan279] [KYUUBI #3675][Subtask] Get table owner for DDL/DML
v2 commands
Authored-by: zhouyifan279 <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../plugin/spark/authz/IcebergCommands.scala | 14 +-
.../plugin/spark/authz/util/AuthZUtils.scala | 2 +-
.../kyuubi/plugin/spark/authz/v2Commands.scala | 118 ++--
.../IcebergCatalogPrivilegesBuilderSuite.scala | 133 ++++
.../spark/authz/PrivilegesBuilderSuite.scala | 18 +-
.../spark/authz/V2CommandsPrivilegesSuite.scala | 671 +++++++++++++++++++++
.../V2JdbcTableCatalogPrivilegesBuilderSuite.scala | 63 ++
7 files changed, 966 insertions(+), 53 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
index fae9e6a02..b2b69be12 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
@@ -73,6 +73,7 @@ object IcebergCommands extends Enumeration {
* @param buildInput input [[PrivilegeObject]] for privilege check
* @param buildOutput output [[PrivilegeObject]] for privilege check
* @param outputActionType [[PrivilegeObjectActionType]] for output
[[PrivilegeObject]]
+ * @param resolveOutputTableOwner Whether to resolve table owner for output
[[PrivilegeObject]]
*/
case class CmdPrivilegeBuilder(
operationType: OperationType = QUERY,
@@ -85,8 +86,10 @@ object IcebergCommands extends Enumeration {
LogicalPlan,
ArrayBuffer[PrivilegeObject],
Seq[CommandType],
- PrivilegeObjectActionType) => Unit = v2Commands.defaultBuildOutput,
- outputActionType: PrivilegeObjectActionType =
PrivilegeObjectActionType.OTHER)
+ PrivilegeObjectActionType,
+ Boolean) => Unit = v2Commands.defaultBuildOutput,
+ outputActionType: PrivilegeObjectActionType =
PrivilegeObjectActionType.OTHER,
+ resolveOutputTableOwner: Boolean = true)
extends super.Val {
def buildPrivileges(
@@ -94,7 +97,12 @@ object IcebergCommands extends Enumeration {
inputObjs: ArrayBuffer[PrivilegeObject],
outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
this.buildInput(plan, inputObjs, commandTypes)
- this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
+ this.buildOutput(
+ plan,
+ outputObjs,
+ commandTypes,
+ outputActionType,
+ resolveOutputTableOwner)
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 801978ea9..5751c4630 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -40,7 +40,7 @@ private[authz] object AuthZUtils {
case Success(value) => value.asInstanceOf[T]
case Failure(e) =>
val candidates =
o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
- throw new RuntimeException(s"$name not in $candidates", e)
+ throw new RuntimeException(s"$name not in ${o.getClass} $candidates",
e)
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
index e7e647b53..05f3018a3 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
@@ -85,29 +85,41 @@ object v2Commands extends Enumeration {
LogicalPlan,
ArrayBuffer[PrivilegeObject],
Seq[CommandType],
- PrivilegeObjectActionType) => Unit =
- (plan, outputObjs, commandTypes, outputObjsActionType) => {
+ PrivilegeObjectActionType,
+ Boolean) => Unit =
+ (plan, outputObjs, commandTypes, outputObjsActionType, resolveTableOwner)
=> {
commandTypes.foreach {
case HasTableNameAsIdentifier =>
val table = invoke(plan, "tableName").asInstanceOf[Identifier]
- outputObjs += v2TablePrivileges(table)
+ val owner = if (resolveTableOwner) getTableOwnerFromV2Plan(plan,
table) else None
+ outputObjs += v2TablePrivileges(table, owner = owner)
case HasTableAsIdentifierOption =>
- val table = getFieldVal[AnyRef](plan, "table")
- val tableIdent = getFieldVal[Option[Identifier]](table, "identifier")
+ val datasourceV2Relation = getFieldVal[LogicalPlan](plan, "table")
+ .find(_.isInstanceOf[DataSourceV2Relation])
+ val tableIdent = datasourceV2Relation.flatMap { r =>
+ getFieldVal[Option[Identifier]](r, "identifier")
+ }
if (tableIdent.isDefined) {
- outputObjs += v2TablePrivileges(tableIdent.get, actionType =
outputObjsActionType)
+ val owner =
+ if (resolveTableOwner)
getDatasourceV2TableOwner(datasourceV2Relation.get) else None
+ outputObjs += v2TablePrivileges(
+ tableIdent.get,
+ owner = owner,
+ actionType = outputObjsActionType)
}
case HasTableAsIdentifier =>
- val table = getFieldVal[LogicalPlan](plan, "table")
- val tableIdent = getFieldVal[Identifier](table, "identifier")
- outputObjs += v2TablePrivileges(tableIdent)
+ val resolvedTable = getFieldVal[LogicalPlan](plan, "table")
+ val tableIdent = getFieldVal[Identifier](resolvedTable, "identifier")
+ val owner = if (resolveTableOwner)
getDatasourceV2TableOwner(resolvedTable) else None
+ outputObjs += v2TablePrivileges(tableIdent, owner = owner)
case HasChildAsIdentifier =>
- val table = getFieldVal[AnyRef](plan, "child")
- val tableIdent = getFieldVal[Identifier](table, "identifier")
- outputObjs += v2TablePrivileges(tableIdent)
+ val resolvedTable = getFieldVal[LogicalPlan](plan, "child")
+ val tableIdent = getFieldVal[Identifier](resolvedTable, "identifier")
+ val owner = if (resolveTableOwner)
getDatasourceV2TableOwner(resolvedTable) else None
+ outputObjs += v2TablePrivileges(tableIdent, owner = owner)
case _ =>
}
@@ -123,6 +135,7 @@ object v2Commands extends Enumeration {
* @param buildInput input [[PrivilegeObject]] for privilege check
* @param buildOutput output [[PrivilegeObject]] for privilege check
* @param outputActionType [[PrivilegeObjectActionType]] for output
[[PrivilegeObject]]
+ * @param resolveOutputTableOwner Whether to resolve table owner for output
[[PrivilegeObject]]
*/
case class CmdPrivilegeBuilder(
operationType: OperationType = QUERY,
@@ -135,8 +148,10 @@ object v2Commands extends Enumeration {
LogicalPlan,
ArrayBuffer[PrivilegeObject],
Seq[CommandType],
- PrivilegeObjectActionType) => Unit = defaultBuildOutput,
- outputActionType: PrivilegeObjectActionType =
PrivilegeObjectActionType.OTHER)
+ PrivilegeObjectActionType,
+ Boolean) => Unit = defaultBuildOutput,
+ outputActionType: PrivilegeObjectActionType =
PrivilegeObjectActionType.OTHER,
+ resolveOutputTableOwner: Boolean = true)
extends super.Val {
def buildPrivileges(
@@ -144,7 +159,12 @@ object v2Commands extends Enumeration {
inputObjs: ArrayBuffer[PrivilegeObject],
outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
this.buildInput(plan, inputObjs, commandTypes)
- this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
+ this.buildOutput(
+ plan,
+ outputObjs,
+ commandTypes,
+ outputActionType,
+ resolveOutputTableOwner)
}
}
@@ -166,7 +186,7 @@ object v2Commands extends Enumeration {
val CreateNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = CREATEDATABASE,
- buildOutput = (plan, outputObjs, _, _) => {
+ buildOutput = (plan, outputObjs, _, _, _) => {
if (isSparkVersionAtLeast("3.3")) {
val resolvedNamespace = getFieldVal[Any](plan, "name")
val databases = getFieldVal[Seq[String]](resolvedNamespace,
"nameParts")
@@ -179,7 +199,7 @@ object v2Commands extends Enumeration {
val DropNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = DROPDATABASE,
- buildOutput = (plan, outputObjs, _, _) => {
+ buildOutput = (plan, outputObjs, _, _, _) => {
val resolvedNamespace = getFieldVal[LogicalPlan](plan, "namespace")
val databases = getFieldVal[Seq[String]](resolvedNamespace, "namespace")
outputObjs += databasePrivileges(quote(databases))
@@ -190,24 +210,29 @@ object v2Commands extends Enumeration {
val CreateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = CREATETABLE,
commandTypes = Seq(HasTableNameAsIdentifier),
- leastVer = Some("3.3"))
+ leastVer = Some("3.3"),
+ resolveOutputTableOwner = false)
val CreateV2Table: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = CREATETABLE,
commandTypes = Seq(HasTableNameAsIdentifier),
- mostVer = Some("3.2"))
+ mostVer = Some("3.2"),
+ resolveOutputTableOwner = false)
val CreateTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = CREATETABLE,
- commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan))
+ commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan),
+ resolveOutputTableOwner = false)
val ReplaceTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = CREATETABLE,
- commandTypes = Seq(HasTableNameAsIdentifier))
+ commandTypes = Seq(HasTableNameAsIdentifier),
+ resolveOutputTableOwner = false)
val ReplaceTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = CREATETABLE,
- commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan))
+ commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan),
+ resolveOutputTableOwner = false)
// with V2WriteCommand
@@ -273,7 +298,7 @@ object v2Commands extends Enumeration {
val CommentOnNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = ALTERDATABASE,
- buildOutput = (plan, outputObjs, _, _) => {
+ buildOutput = (plan, outputObjs, _, _, _) => {
val resolvedNamespace = getFieldVal[AnyRef](plan, "child")
val namespace = getFieldVal[Seq[String]](resolvedNamespace, "namespace")
outputObjs += databasePrivileges(quote(namespace))
@@ -286,27 +311,31 @@ object v2Commands extends Enumeration {
val DropTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = DROPTABLE,
- buildOutput = (plan, outputObjs, _, _) => {
- val tableIdent =
+ buildOutput = (plan, outputObjs, _, _, _) => {
+ val (tableIdent, owner) =
if (isSparkVersionAtLeast("3.1")) {
val resolvedTable = getFieldVal[LogicalPlan](plan, "child")
- getFieldVal[Identifier](resolvedTable, "identifier")
+ (
+ getFieldVal[Identifier](resolvedTable, "identifier"),
+ getDatasourceV2TableOwner(resolvedTable))
} else {
- getFieldVal[Identifier](plan, "ident")
+ (getFieldVal[Identifier](plan, "ident"), None)
}
- outputObjs += v2TablePrivileges(tableIdent)
+ outputObjs += v2TablePrivileges(tableIdent, owner = owner)
})
val MergeIntoTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
buildInput = (plan, inputObjs, _) => {
- val table = getFieldVal[DataSourceV2Relation](plan, "sourceTable")
+ val table = getFieldVal[LogicalPlan](plan, "sourceTable")
buildQuery(table, inputObjs)
},
- buildOutput = (plan, outputObjs, _, _) => {
- val table = getFieldVal[DataSourceV2Relation](plan, "targetTable")
- if (table.identifier.isDefined) {
- outputObjs += v2TablePrivileges(
- table.identifier.get,
- actionType = PrivilegeObjectActionType.UPDATE)
+ buildOutput = (plan, outputObjs, _, _, _) => {
+ val aliasOrRelation = getFieldVal[LogicalPlan](plan, "targetTable")
+ aliasOrRelation.find(_.isInstanceOf[DataSourceV2Relation]).foreach {
+ case table: DataSourceV2Relation if table.identifier.isDefined =>
+ outputObjs += v2TablePrivileges(
+ table.identifier.get,
+ owner = getDatasourceV2TableOwner(table),
+ actionType = PrivilegeObjectActionType.UPDATE)
}
})
@@ -317,10 +346,14 @@ object v2Commands extends Enumeration {
val TruncateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
leastVer = Some("3.2"),
- buildOutput = (plan, outputObjs, _, _) => {
- val table = getFieldVal[Any](plan, "table")
- val tableIdent = getFieldVal[Identifier](table, "identifier")
- outputObjs += v2TablePrivileges(tableIdent, actionType =
PrivilegeObjectActionType.UPDATE)
+ buildOutput = (plan, outputObjs, _, _, _) => {
+ val resolvedTable = getFieldVal[LogicalPlan](plan, "table")
+ val tableIdent = getFieldVal[Identifier](resolvedTable, "identifier")
+ val owner = getDatasourceV2TableOwner(resolvedTable)
+ outputObjs += v2TablePrivileges(
+ tableIdent,
+ owner = owner,
+ actionType = PrivilegeObjectActionType.UPDATE)
})
// with V2AlterTableCommand
@@ -328,11 +361,12 @@ object v2Commands extends Enumeration {
val AlterTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
operationType = ALTERTABLE_ADDCOLS,
mostVer = Some("3.1"),
- buildOutput = (plan, outputObjs, _, _) => {
- val table = getFieldVal[Any](plan, "table")
+ buildOutput = (plan, outputObjs, _, _, _) => {
+ val table = getFieldVal[LogicalPlan](plan, "table")
val tableIdent = getFieldVal[Option[Identifier]](table, "identifier")
if (tableIdent.isDefined) {
- outputObjs += v2TablePrivileges(tableIdent.get)
+ val owner = getDatasourceV2TableOwner(table)
+ outputObjs += v2TablePrivileges(tableIdent.get, owner = owner)
}
})
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
new file mode 100644
index 000000000..a5ec2cfcc
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import org.scalatest.Outcome
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
+
+class IcebergCatalogPrivilegesBuilderSuite extends V2CommandsPrivilegesSuite {
+ override protected val catalogImpl: String = "hive"
+ override protected val sqlExtensions: String =
+ if (isSparkV32OrGreater) {
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
+ } else ""
+ override protected def format = "iceberg"
+
+ override protected val supportsUpdateTable = false
+ override protected val supportsMergeIntoTable = false
+ override protected val supportsDelete = false
+ override protected val supportsPartitionGrammar = true
+ override protected val supportsPartitionManagement = false
+
+ override def beforeAll(): Unit = {
+ if (isSparkV32OrGreater) {
+ spark.conf.set(
+ s"spark.sql.catalog.$catalogV2",
+ "org.apache.iceberg.spark.SparkCatalog")
+ spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop")
+ spark.conf.set(
+ s"spark.sql.catalog.$catalogV2.warehouse",
+ Utils.createTempDir("iceberg-hadoop").toString)
+ }
+ super.beforeAll()
+ }
+
+ override def withFixture(test: NoArgTest): Outcome = {
+ assume(isSparkV32OrGreater)
+ test()
+ }
+
+ test("DeleteFromIcebergTable") {
+ val plan = sql(s"DELETE FROM $catalogTable WHERE key = 1
").queryExecution.analyzed
+ assert(IcebergCommands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ test("UpdateIcebergTable") {
+ val plan = sql(s"UPDATE $catalogTable SET value = 'b' WHERE key = 1
").queryExecution.analyzed
+ assert(IcebergCommands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ test("MergeIntoIcebergTable") {
+ val table = "MergeIntoIcebergTable"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (key int, value String) USING iceberg")
+ val plan = sql(s"MERGE INTO $tableId t " +
+ s"USING (SELECT * FROM $catalogTable) s " +
+ s"ON t.key = s.key " +
+ s"WHEN MATCHED THEN UPDATE SET t.value = s.value " +
+ s"WHEN NOT MATCHED THEN INSERT *").queryExecution.analyzed
+ assert(IcebergCommands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 1)
+ val po0 = inputs.head
+ assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po0.dbname === namespace)
+ assert(po0.objectName === catalogTableShort)
+ assert(po0.columns === Seq("key", "value"))
+ checkV2TableOwner(po0)
+
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index 89be44630..3d4bc3db6 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -67,10 +67,12 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
checkColumns(sql(query).queryExecution.optimizedPlan, cols)
}
- protected def checkTableOwner(po: PrivilegeObject): Unit = {
+ protected def checkTableOwner(
+ po: PrivilegeObject,
+ expectedOwner: String = defaultTableOwner): Unit = {
if (catalogImpl == "hive" && po.privilegeObjectType ===
PrivilegeObjectType.TABLE_OR_VIEW) {
assert(po.owner.isDefined)
- assert(po.owner.get === defaultTableOwner)
+ assert(po.owner.get === expectedOwner)
}
}
@@ -91,12 +93,14 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
}
override def afterAll(): Unit = {
- Seq(reusedTable, reusedPartTable).foreach { t =>
- sql(s"DROP TABLE IF EXISTS $t")
+ try {
+ Seq(reusedTable, reusedPartTable).foreach { t =>
+ sql(s"DROP TABLE IF EXISTS $t")
+ }
+ sql(s"DROP DATABASE IF EXISTS $reusedDb")
+ } finally {
+ spark.stop()
}
- sql(s"DROP DATABASE IF EXISTS $reusedDb")
- spark.stop()
- super.afterAll()
}
override def beforeEach(): Unit = {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
new file mode 100644
index 000000000..18cbecda8
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
+
+abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite {
+
+ protected val supportsUpdateTable: Boolean
+ protected val supportsMergeIntoTable: Boolean
+ protected val supportsDelete: Boolean
+ protected val supportsPartitionGrammar: Boolean
+ protected val supportsPartitionManagement: Boolean
+
+ val catalogV2 = "local"
+ val namespace = "catalog_ns"
+ val catalogTable = s"$catalogV2.$namespace.catalog_table"
+ val catalogTableShort = catalogTable.split("\\.").last
+ val catalogPartTable = s"$catalogV2.$namespace.catalog_part_table"
+ val catalogPartTableShort = catalogPartTable.split("\\.").last
+ val defaultV2TableOwner =
UserGroupInformation.getCurrentUser.getShortUserName
+
+ protected def withV2Table(table: String)(f: String => Unit): Unit = {
+ val tableId = s"$catalogV2.$namespace.$table"
+ try {
+ f(tableId)
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $tableId")
+ }
+ }
+
+ protected def checkV2TableOwner(po: PrivilegeObject): Unit = {
+ checkTableOwner(po, defaultV2TableOwner)
+ }
+
+ protected def withV2Table(table1: String, table2: String)(f: (String,
String) => Unit): Unit = {
+ val tableId1 = s"$catalogV2.$namespace.$table1"
+ val tableId2 = s"$catalogV2.$namespace.$table2"
+ try {
+ f(tableId1, tableId2)
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $tableId1")
+ sql(s"DROP TABLE IF EXISTS $tableId2")
+ }
+ }
+
+ protected def executePlan(sql: String): QueryExecution = {
+ val parsed = spark.sessionState.sqlParser.parsePlan(sql)
+ spark.sessionState.executePlan(parsed)
+ }
+
+ override def beforeAll(): Unit = {
+ if (spark.conf.getOption(s"spark.sql.catalog.$catalogV2").isDefined) {
+ sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace")
+ sql(
+ s"CREATE TABLE IF NOT EXISTS $catalogTable (key int, value String)")
+ if (supportsPartitionGrammar) {
+ sql(
+ s"CREATE TABLE IF NOT EXISTS $catalogPartTable (key int, value
String, dt String)" +
+ s" PARTITIONED BY (dt)")
+ }
+ }
+
+ super.beforeAll()
+ }
+
+ test("CreateTable") {
+ val table = "CreateTable"
+ withV2Table(table) { tableId =>
+ val plan = executePlan(
+ s"CREATE TABLE IF NOT EXISTS $tableId (i int)").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === CREATETABLE)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ assert(po.owner.isEmpty)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.CREATE)
+ }
+ }
+
+ test("CreateTableAsSelect") {
+ val table = "CreateTableAsSelect"
+ withV2Table(table) { tableId =>
+ val plan = executePlan(
+ s"CREATE TABLE IF NOT EXISTS $tableId AS " +
+ s"SELECT * FROM $reusedTable").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === CREATETABLE)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 1)
+ val po0 = inputs.head
+ assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po0.dbname equalsIgnoreCase reusedDb)
+ assert(po0.objectName equalsIgnoreCase reusedTableShort)
+ assert(po0.columns.take(2) === Seq("key", "value"))
+ checkTableOwner(po0)
+
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ assert(po.owner.isEmpty)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.CREATE)
+ }
+ }
+
+ test("ReplaceTable") {
+ val table = "ReplaceTable"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE IF NOT EXISTS $tableId (i int)")
+ val plan = executePlan(s"REPLACE TABLE $tableId (j int)").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === CREATETABLE)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ assert(po.owner.isEmpty)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.CREATE)
+ }
+ }
+
+ test("ReplaceTableAsSelect") {
+ val table = "ReplaceTableAsSelect"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE IF NOT EXISTS $tableId (i int)")
+ val plan =
+ executePlan(s"REPLACE TABLE $tableId AS SELECT * FROM
$reusedTable").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === CREATETABLE)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 1)
+ val po0 = inputs.head
+ assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po0.dbname equalsIgnoreCase reusedDb)
+ assert(po0.objectName equalsIgnoreCase reusedTableShort)
+ assert(po0.columns.take(2) === Seq("key", "value"))
+ checkTableOwner(po0)
+
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ assert(po.owner.isEmpty)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.CREATE)
+ }
+ }
+
+ // with V2WriteCommand
+
+ test("AppendData") {
+ val plan = executePlan(s"INSERT INTO $catalogTable VALUES (0,
'a')").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.INSERT)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ test("UpdateTable") {
+ assume(isSparkV32OrGreater)
+ assume(supportsUpdateTable)
+
+ val plan = executePlan(s"UPDATE $catalogTable SET value = 'a' WHERE key =
0").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ test("DeleteFromTable") {
+ assume(supportsDelete)
+
+ val plan = executePlan(s"DELETE FROM $catalogTable WHERE key = 0").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ test("OverwriteByExpression") {
+ val plan = executePlan(s"INSERT OVERWRITE TABLE $catalogTable VALUES (0,
1)").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ test("OverwritePartitionsDynamic") {
+ assume(supportsPartitionGrammar)
+
+ try {
+ sql("SET spark.sql.sources.partitionOverwriteMode=dynamic")
+ val plan = executePlan(s"INSERT OVERWRITE TABLE $catalogPartTable
PARTITION (dt)" +
+ s"VALUES (0, 1, '2022-01-01')").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogPartTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ } finally {
+ sql("SET spark.sql.sources.partitionOverwriteMode=static")
+ }
+ }
+
+ test("AddPartitions") {
+ assume(supportsPartitionManagement)
+ assume(isSparkV32OrGreater)
+
+ val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+ s"ADD PARTITION (dt='2022-01-01')").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_ADDPARTS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogPartTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+
+ test("DropPartitions") {
+ assume(supportsPartitionManagement)
+ assume(isSparkV32OrGreater)
+
+ val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+ s"DROP PARTITION (dt='2022-01-01')").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_DROPPARTS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogPartTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+
+ test("RenamePartitions") {
+ assume(supportsPartitionManagement)
+ assume(isSparkV32OrGreater)
+
+ val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+ s"PARTITION (dt='2022-01-01') RENAME TO PARTITION
(dt='2022-01-02')").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_ADDPARTS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogPartTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+
+ test("TruncatePartition") {
+ assume(supportsPartitionManagement)
+ assume(isSparkV32OrGreater)
+
+ val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+ s"PARTITION (dt='2022-01-01') RENAME TO PARTITION
(dt='2022-01-02')").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_DROPPARTS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogPartTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+
+ // other table commands
+
+ test("CommentOnTable") {
+ val plan = executePlan(s"COMMENT ON TABLE $catalogTable IS
'text'").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_PROPERTIES)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+
+ test("DropTable") {
+ val table = "DropTable"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (i int)")
+ val plan = executePlan(s"DROP TABLE $tableId").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === DROPTABLE)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.DROP)
+ }
+ }
+
+ test("MergeIntoTable") {
+ assume(supportsMergeIntoTable)
+
+ val table = "MergeIntoTable"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (key int, value String)")
+ val plan = executePlan(s"MERGE INTO $tableId t " +
+ s"USING (SELECT * FROM $catalogTable) s " +
+ s"ON t.key = s.key " +
+ s"WHEN MATCHED THEN UPDATE SET t.value = s.value " +
+ s"WHEN NOT MATCHED THEN INSERT *").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.size == 1)
+ val po0 = inputs.head
+ assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po0.dbname === namespace)
+ assert(po0.objectName === catalogTableShort)
+ assert(po0.columns === Seq("key", "value"))
+ checkV2TableOwner(po0)
+
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+ }
+
+ test("RepairTable") {
+ assume(supportsPartitionGrammar)
+ assume(isSparkV32OrGreater)
+
+ val plan = executePlan(s"MSCK REPAIR TABLE $catalogPartTable").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_ADDPARTS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogPartTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+
+ test("TruncateTable") {
+ assume(isSparkV32OrGreater)
+
+ val plan = executePlan(s"TRUNCATE TABLE $catalogTable").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === QUERY)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === catalogTableShort)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.UPDATE)
+ }
+
+ // with V2AlterTableCommand
+
+ test("AddColumns") {
+ assume(isSparkV32OrGreater)
+
+ val table = "AddColumns"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (i int)")
+ val plan = executePlan(s"ALTER TABLE $tableId ADD COLUMNS (j
int)").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_ADDCOLS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+ }
+
+ test("AlterColumn") {
+ assume(isSparkV32OrGreater)
+
+ val table = "AlterColumn"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (i int)")
+ val plan = executePlan(s"ALTER TABLE $tableId ALTER COLUMN i TYPE
int").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_ADDCOLS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+ }
+
+ test("DropColumns") {
+ assume(isSparkV32OrGreater)
+
+ val table = "DropColumns"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (i int, j int)")
+ val plan = executePlan(s"ALTER TABLE $tableId DROP COLUMN i").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_ADDCOLS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+ }
+
+ test("ReplaceColumns") {
+ assume(isSparkV32OrGreater)
+
+ val table = "ReplaceColumns"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (i int, j int)")
+ val plan = executePlan(s"ALTER TABLE $tableId REPLACE COLUMNS (i
String)").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_REPLACECOLS)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+ }
+
+ test("RenameColumn") {
+ assume(isSparkV32OrGreater)
+
+ val table = "RenameColumn"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE $tableId (i int, j int)")
+ val plan = executePlan(s"ALTER TABLE $tableId RENAME COLUMN i TO
k").analyzed
+ assert(v2Commands.accept(plan.nodeName))
+ val operationType = OperationType(plan.nodeName)
+ assert(operationType === ALTERTABLE_RENAMECOL)
+
+ val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+ assert(inputs.isEmpty)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.dbname === namespace)
+ assert(po.objectName === table)
+ assert(po.columns.isEmpty)
+ checkV2TableOwner(po)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2JdbcTableCatalogPrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2JdbcTableCatalogPrivilegesBuilderSuite.scala
new file mode 100644
index 000000000..7ffaf29db
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2JdbcTableCatalogPrivilegesBuilderSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.plugin.spark.authz
+
+import java.sql.DriverManager
+
+import scala.util.Try
+
+import org.scalatest.Outcome
+
+class V2JdbcTableCatalogPrivilegesBuilderSuite extends
V2CommandsPrivilegesSuite {
+ override protected val catalogImpl: String = "in-memory"
+
+ override protected val supportsUpdateTable = true
+ override protected val supportsMergeIntoTable = true
+ override protected val supportsDelete = true
+ override protected val supportsPartitionGrammar = false
+ override protected val supportsPartitionManagement = false
+
+ val dbUrl = s"jdbc:derby:memory:$catalogV2"
+ val jdbcUrl: String = s"$dbUrl;create=true"
+
+ override def beforeAll(): Unit = {
+ if (isSparkV31OrGreater) {
+ spark.conf.set(
+ s"spark.sql.catalog.$catalogV2",
+ "org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
+ spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
+ spark.conf.set(
+ s"spark.sql.catalog.$catalogV2.driver",
+ "org.apache.derby.jdbc.AutoloadedDriver")
+ }
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+
+ // cleanup db
+ Try {
+ DriverManager.getConnection(s"$dbUrl;shutdown=true")
+ }
+ }
+
+ override def withFixture(test: NoArgTest): Outcome = {
+ assume(isSparkV31OrGreater)
+ test()
+ }
+}