This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 40eed8c4a [KYUUBI #3675][Subtask] Get table owner for DDL/DML v2 
commands
40eed8c4a is described below

commit 40eed8c4a7e7a68b47a94e1174df6426e10d5f41
Author: zhouyifan279 <[email protected]>
AuthorDate: Tue Oct 25 14:45:54 2022 +0800

    [KYUUBI #3675][Subtask] Get table owner for DDL/DML v2 commands
    
    ### _Why are the changes needed?_
    Subtask of #3607
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #3683 from zhouyifan279/3675.
    
    Closes #3675
    
    45b960a4 [zhouyifan279] [KYUUBI #3675][Subtask] Get table owner for DDL/DML 
v2 commands
    d1b4a35f [zhouyifan279] [KYUUBI #3675][Subtask] Get table owner for DDL/DML 
v2 commands
    0ef3b986 [zhouyifan279] [KYUUBI #3675][Subtask] Get table owner for DDL/DML 
v2 commands
    
    Authored-by: zhouyifan279 <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../plugin/spark/authz/IcebergCommands.scala       |  14 +-
 .../plugin/spark/authz/util/AuthZUtils.scala       |   2 +-
 .../kyuubi/plugin/spark/authz/v2Commands.scala     | 118 ++--
 .../IcebergCatalogPrivilegesBuilderSuite.scala     | 133 ++++
 .../spark/authz/PrivilegesBuilderSuite.scala       |  18 +-
 .../spark/authz/V2CommandsPrivilegesSuite.scala    | 671 +++++++++++++++++++++
 .../V2JdbcTableCatalogPrivilegesBuilderSuite.scala |  63 ++
 7 files changed, 966 insertions(+), 53 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
index fae9e6a02..b2b69be12 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCommands.scala
@@ -73,6 +73,7 @@ object IcebergCommands extends Enumeration {
    * @param buildInput       input [[PrivilegeObject]] for privilege check
    * @param buildOutput      output [[PrivilegeObject]] for privilege check
    * @param outputActionType [[PrivilegeObjectActionType]] for output 
[[PrivilegeObject]]
+   * @param resolveOutputTableOwner Whether to resolve table owner for output 
[[PrivilegeObject]]
    */
   case class CmdPrivilegeBuilder(
       operationType: OperationType = QUERY,
@@ -85,8 +86,10 @@ object IcebergCommands extends Enumeration {
           LogicalPlan,
           ArrayBuffer[PrivilegeObject],
           Seq[CommandType],
-          PrivilegeObjectActionType) => Unit = v2Commands.defaultBuildOutput,
-      outputActionType: PrivilegeObjectActionType = 
PrivilegeObjectActionType.OTHER)
+          PrivilegeObjectActionType,
+          Boolean) => Unit = v2Commands.defaultBuildOutput,
+      outputActionType: PrivilegeObjectActionType = 
PrivilegeObjectActionType.OTHER,
+      resolveOutputTableOwner: Boolean = true)
     extends super.Val {
 
     def buildPrivileges(
@@ -94,7 +97,12 @@ object IcebergCommands extends Enumeration {
         inputObjs: ArrayBuffer[PrivilegeObject],
         outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
       this.buildInput(plan, inputObjs, commandTypes)
-      this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
+      this.buildOutput(
+        plan,
+        outputObjs,
+        commandTypes,
+        outputActionType,
+        resolveOutputTableOwner)
     }
   }
 
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
index 801978ea9..5751c4630 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala
@@ -40,7 +40,7 @@ private[authz] object AuthZUtils {
       case Success(value) => value.asInstanceOf[T]
       case Failure(e) =>
         val candidates = 
o.getClass.getDeclaredFields.map(_.getName).mkString("[", ",", "]")
-        throw new RuntimeException(s"$name not in $candidates", e)
+        throw new RuntimeException(s"$name not in ${o.getClass} $candidates", 
e)
     }
   }
 
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
index e7e647b53..05f3018a3 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/v2Commands.scala
@@ -85,29 +85,41 @@ object v2Commands extends Enumeration {
       LogicalPlan,
       ArrayBuffer[PrivilegeObject],
       Seq[CommandType],
-      PrivilegeObjectActionType) => Unit =
-    (plan, outputObjs, commandTypes, outputObjsActionType) => {
+      PrivilegeObjectActionType,
+      Boolean) => Unit =
+    (plan, outputObjs, commandTypes, outputObjsActionType, resolveTableOwner) 
=> {
       commandTypes.foreach {
         case HasTableNameAsIdentifier =>
           val table = invoke(plan, "tableName").asInstanceOf[Identifier]
-          outputObjs += v2TablePrivileges(table)
+          val owner = if (resolveTableOwner) getTableOwnerFromV2Plan(plan, 
table) else None
+          outputObjs += v2TablePrivileges(table, owner = owner)
 
         case HasTableAsIdentifierOption =>
-          val table = getFieldVal[AnyRef](plan, "table")
-          val tableIdent = getFieldVal[Option[Identifier]](table, "identifier")
+          val datasourceV2Relation = getFieldVal[LogicalPlan](plan, "table")
+            .find(_.isInstanceOf[DataSourceV2Relation])
+          val tableIdent = datasourceV2Relation.flatMap { r =>
+            getFieldVal[Option[Identifier]](r, "identifier")
+          }
           if (tableIdent.isDefined) {
-            outputObjs += v2TablePrivileges(tableIdent.get, actionType = 
outputObjsActionType)
+            val owner =
+              if (resolveTableOwner) 
getDatasourceV2TableOwner(datasourceV2Relation.get) else None
+            outputObjs += v2TablePrivileges(
+              tableIdent.get,
+              owner = owner,
+              actionType = outputObjsActionType)
           }
 
         case HasTableAsIdentifier =>
-          val table = getFieldVal[LogicalPlan](plan, "table")
-          val tableIdent = getFieldVal[Identifier](table, "identifier")
-          outputObjs += v2TablePrivileges(tableIdent)
+          val resolvedTable = getFieldVal[LogicalPlan](plan, "table")
+          val tableIdent = getFieldVal[Identifier](resolvedTable, "identifier")
+          val owner = if (resolveTableOwner) 
getDatasourceV2TableOwner(resolvedTable) else None
+          outputObjs += v2TablePrivileges(tableIdent, owner = owner)
 
         case HasChildAsIdentifier =>
-          val table = getFieldVal[AnyRef](plan, "child")
-          val tableIdent = getFieldVal[Identifier](table, "identifier")
-          outputObjs += v2TablePrivileges(tableIdent)
+          val resolvedTable = getFieldVal[LogicalPlan](plan, "child")
+          val tableIdent = getFieldVal[Identifier](resolvedTable, "identifier")
+          val owner = if (resolveTableOwner) 
getDatasourceV2TableOwner(resolvedTable) else None
+          outputObjs += v2TablePrivileges(tableIdent, owner = owner)
 
         case _ =>
       }
@@ -123,6 +135,7 @@ object v2Commands extends Enumeration {
    * @param buildInput       input [[PrivilegeObject]] for privilege check
    * @param buildOutput      output [[PrivilegeObject]] for privilege check
    * @param outputActionType [[PrivilegeObjectActionType]] for output 
[[PrivilegeObject]]
+   * @param resolveOutputTableOwner Whether to resolve table owner for output 
[[PrivilegeObject]]
    */
   case class CmdPrivilegeBuilder(
       operationType: OperationType = QUERY,
@@ -135,8 +148,10 @@ object v2Commands extends Enumeration {
           LogicalPlan,
           ArrayBuffer[PrivilegeObject],
           Seq[CommandType],
-          PrivilegeObjectActionType) => Unit = defaultBuildOutput,
-      outputActionType: PrivilegeObjectActionType = 
PrivilegeObjectActionType.OTHER)
+          PrivilegeObjectActionType,
+          Boolean) => Unit = defaultBuildOutput,
+      outputActionType: PrivilegeObjectActionType = 
PrivilegeObjectActionType.OTHER,
+      resolveOutputTableOwner: Boolean = true)
     extends super.Val {
 
     def buildPrivileges(
@@ -144,7 +159,12 @@ object v2Commands extends Enumeration {
         inputObjs: ArrayBuffer[PrivilegeObject],
         outputObjs: ArrayBuffer[PrivilegeObject]): Unit = {
       this.buildInput(plan, inputObjs, commandTypes)
-      this.buildOutput(plan, outputObjs, commandTypes, outputActionType)
+      this.buildOutput(
+        plan,
+        outputObjs,
+        commandTypes,
+        outputActionType,
+        resolveOutputTableOwner)
     }
   }
 
@@ -166,7 +186,7 @@ object v2Commands extends Enumeration {
 
   val CreateNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = CREATEDATABASE,
-    buildOutput = (plan, outputObjs, _, _) => {
+    buildOutput = (plan, outputObjs, _, _, _) => {
       if (isSparkVersionAtLeast("3.3")) {
         val resolvedNamespace = getFieldVal[Any](plan, "name")
         val databases = getFieldVal[Seq[String]](resolvedNamespace, 
"nameParts")
@@ -179,7 +199,7 @@ object v2Commands extends Enumeration {
 
   val DropNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = DROPDATABASE,
-    buildOutput = (plan, outputObjs, _, _) => {
+    buildOutput = (plan, outputObjs, _, _, _) => {
       val resolvedNamespace = getFieldVal[LogicalPlan](plan, "namespace")
       val databases = getFieldVal[Seq[String]](resolvedNamespace, "namespace")
       outputObjs += databasePrivileges(quote(databases))
@@ -190,24 +210,29 @@ object v2Commands extends Enumeration {
   val CreateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = CREATETABLE,
     commandTypes = Seq(HasTableNameAsIdentifier),
-    leastVer = Some("3.3"))
+    leastVer = Some("3.3"),
+    resolveOutputTableOwner = false)
 
   val CreateV2Table: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = CREATETABLE,
     commandTypes = Seq(HasTableNameAsIdentifier),
-    mostVer = Some("3.2"))
+    mostVer = Some("3.2"),
+    resolveOutputTableOwner = false)
 
   val CreateTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = CREATETABLE,
-    commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan))
+    commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan),
+    resolveOutputTableOwner = false)
 
   val ReplaceTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = CREATETABLE,
-    commandTypes = Seq(HasTableNameAsIdentifier))
+    commandTypes = Seq(HasTableNameAsIdentifier),
+    resolveOutputTableOwner = false)
 
   val ReplaceTableAsSelect: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = CREATETABLE,
-    commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan))
+    commandTypes = Seq(HasTableNameAsIdentifier, HasQueryAsLogicalPlan),
+    resolveOutputTableOwner = false)
 
   // with V2WriteCommand
 
@@ -273,7 +298,7 @@ object v2Commands extends Enumeration {
 
   val CommentOnNamespace: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = ALTERDATABASE,
-    buildOutput = (plan, outputObjs, _, _) => {
+    buildOutput = (plan, outputObjs, _, _, _) => {
       val resolvedNamespace = getFieldVal[AnyRef](plan, "child")
       val namespace = getFieldVal[Seq[String]](resolvedNamespace, "namespace")
       outputObjs += databasePrivileges(quote(namespace))
@@ -286,27 +311,31 @@ object v2Commands extends Enumeration {
 
   val DropTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = DROPTABLE,
-    buildOutput = (plan, outputObjs, _, _) => {
-      val tableIdent =
+    buildOutput = (plan, outputObjs, _, _, _) => {
+      val (tableIdent, owner) =
         if (isSparkVersionAtLeast("3.1")) {
           val resolvedTable = getFieldVal[LogicalPlan](plan, "child")
-          getFieldVal[Identifier](resolvedTable, "identifier")
+          (
+            getFieldVal[Identifier](resolvedTable, "identifier"),
+            getDatasourceV2TableOwner(resolvedTable))
         } else {
-          getFieldVal[Identifier](plan, "ident")
+          (getFieldVal[Identifier](plan, "ident"), None)
         }
-      outputObjs += v2TablePrivileges(tableIdent)
+      outputObjs += v2TablePrivileges(tableIdent, owner = owner)
     })
   val MergeIntoTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     buildInput = (plan, inputObjs, _) => {
-      val table = getFieldVal[DataSourceV2Relation](plan, "sourceTable")
+      val table = getFieldVal[LogicalPlan](plan, "sourceTable")
       buildQuery(table, inputObjs)
     },
-    buildOutput = (plan, outputObjs, _, _) => {
-      val table = getFieldVal[DataSourceV2Relation](plan, "targetTable")
-      if (table.identifier.isDefined) {
-        outputObjs += v2TablePrivileges(
-          table.identifier.get,
-          actionType = PrivilegeObjectActionType.UPDATE)
+    buildOutput = (plan, outputObjs, _, _, _) => {
+      val aliasOrRelation = getFieldVal[LogicalPlan](plan, "targetTable")
+      aliasOrRelation.find(_.isInstanceOf[DataSourceV2Relation]).foreach {
+        case table: DataSourceV2Relation if table.identifier.isDefined =>
+          outputObjs += v2TablePrivileges(
+            table.identifier.get,
+            owner = getDatasourceV2TableOwner(table),
+            actionType = PrivilegeObjectActionType.UPDATE)
       }
     })
 
@@ -317,10 +346,14 @@ object v2Commands extends Enumeration {
 
   val TruncateTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     leastVer = Some("3.2"),
-    buildOutput = (plan, outputObjs, _, _) => {
-      val table = getFieldVal[Any](plan, "table")
-      val tableIdent = getFieldVal[Identifier](table, "identifier")
-      outputObjs += v2TablePrivileges(tableIdent, actionType = 
PrivilegeObjectActionType.UPDATE)
+    buildOutput = (plan, outputObjs, _, _, _) => {
+      val resolvedTable = getFieldVal[LogicalPlan](plan, "table")
+      val tableIdent = getFieldVal[Identifier](resolvedTable, "identifier")
+      val owner = getDatasourceV2TableOwner(resolvedTable)
+      outputObjs += v2TablePrivileges(
+        tableIdent,
+        owner = owner,
+        actionType = PrivilegeObjectActionType.UPDATE)
     })
 
   // with V2AlterTableCommand
@@ -328,11 +361,12 @@ object v2Commands extends Enumeration {
   val AlterTable: CmdPrivilegeBuilder = CmdPrivilegeBuilder(
     operationType = ALTERTABLE_ADDCOLS,
     mostVer = Some("3.1"),
-    buildOutput = (plan, outputObjs, _, _) => {
-      val table = getFieldVal[Any](plan, "table")
+    buildOutput = (plan, outputObjs, _, _, _) => {
+      val table = getFieldVal[LogicalPlan](plan, "table")
       val tableIdent = getFieldVal[Option[Identifier]](table, "identifier")
       if (tableIdent.isDefined) {
-        outputObjs += v2TablePrivileges(tableIdent.get)
+        val owner = getDatasourceV2TableOwner(table)
+        outputObjs += v2TablePrivileges(tableIdent.get, owner = owner)
       }
     })
 
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
new file mode 100644
index 000000000..a5ec2cfcc
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import org.scalatest.Outcome
+
+import org.apache.kyuubi.Utils
+import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
+
+class IcebergCatalogPrivilegesBuilderSuite extends V2CommandsPrivilegesSuite {
+  override protected val catalogImpl: String = "hive"
+  override protected val sqlExtensions: String =
+    if (isSparkV32OrGreater) {
+      "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
+    } else ""
+  override protected def format = "iceberg"
+
+  override protected val supportsUpdateTable = false
+  override protected val supportsMergeIntoTable = false
+  override protected val supportsDelete = false
+  override protected val supportsPartitionGrammar = true
+  override protected val supportsPartitionManagement = false
+
+  override def beforeAll(): Unit = {
+    if (isSparkV32OrGreater) {
+      spark.conf.set(
+        s"spark.sql.catalog.$catalogV2",
+        "org.apache.iceberg.spark.SparkCatalog")
+      spark.conf.set(s"spark.sql.catalog.$catalogV2.type", "hadoop")
+      spark.conf.set(
+        s"spark.sql.catalog.$catalogV2.warehouse",
+        Utils.createTempDir("iceberg-hadoop").toString)
+    }
+    super.beforeAll()
+  }
+
+  override def withFixture(test: NoArgTest): Outcome = {
+    assume(isSparkV32OrGreater)
+    test()
+  }
+
+  test("DeleteFromIcebergTable") {
+    val plan = sql(s"DELETE FROM $catalogTable WHERE key = 1 
").queryExecution.analyzed
+    assert(IcebergCommands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.isEmpty)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  test("UpdateIcebergTable") {
+    val plan = sql(s"UPDATE $catalogTable SET value = 'b' WHERE key = 1 
").queryExecution.analyzed
+    assert(IcebergCommands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.isEmpty)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  test("MergeIntoIcebergTable") {
+    val table = "MergeIntoIcebergTable"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (key int, value String) USING iceberg")
+      val plan = sql(s"MERGE INTO $tableId t " +
+        s"USING (SELECT * FROM $catalogTable) s " +
+        s"ON t.key = s.key " +
+        s"WHEN MATCHED THEN UPDATE SET t.value = s.value " +
+        s"WHEN NOT MATCHED THEN INSERT *").queryExecution.analyzed
+      assert(IcebergCommands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === QUERY)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.size === 1)
+      val po0 = inputs.head
+      assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po0.dbname === namespace)
+      assert(po0.objectName === catalogTableShort)
+      assert(po0.columns === Seq("key", "value"))
+      checkV2TableOwner(po0)
+
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.UPDATE)
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index 89be44630..3d4bc3db6 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -67,10 +67,12 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
     checkColumns(sql(query).queryExecution.optimizedPlan, cols)
   }
 
-  protected def checkTableOwner(po: PrivilegeObject): Unit = {
+  protected def checkTableOwner(
+      po: PrivilegeObject,
+      expectedOwner: String = defaultTableOwner): Unit = {
     if (catalogImpl == "hive" && po.privilegeObjectType === 
PrivilegeObjectType.TABLE_OR_VIEW) {
       assert(po.owner.isDefined)
-      assert(po.owner.get === defaultTableOwner)
+      assert(po.owner.get === expectedOwner)
     }
   }
 
@@ -91,12 +93,14 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
   }
 
   override def afterAll(): Unit = {
-    Seq(reusedTable, reusedPartTable).foreach { t =>
-      sql(s"DROP TABLE IF EXISTS $t")
+    try {
+      Seq(reusedTable, reusedPartTable).foreach { t =>
+        sql(s"DROP TABLE IF EXISTS $t")
+      }
+      sql(s"DROP DATABASE IF EXISTS $reusedDb")
+    } finally {
+      spark.stop()
     }
-    sql(s"DROP DATABASE IF EXISTS $reusedDb")
-    spark.stop()
-    super.afterAll()
   }
 
   override def beforeEach(): Unit = {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
new file mode 100644
index 000000000..18cbecda8
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -0,0 +1,671 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import org.apache.hadoop.security.UserGroupInformation
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.kyuubi.plugin.spark.authz.OperationType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
+
+abstract class V2CommandsPrivilegesSuite extends PrivilegesBuilderSuite {
+
+  protected val supportsUpdateTable: Boolean
+  protected val supportsMergeIntoTable: Boolean
+  protected val supportsDelete: Boolean
+  protected val supportsPartitionGrammar: Boolean
+  protected val supportsPartitionManagement: Boolean
+
+  val catalogV2 = "local"
+  val namespace = "catalog_ns"
+  val catalogTable = s"$catalogV2.$namespace.catalog_table"
+  val catalogTableShort = catalogTable.split("\\.").last
+  val catalogPartTable = s"$catalogV2.$namespace.catalog_part_table"
+  val catalogPartTableShort = catalogPartTable.split("\\.").last
+  val defaultV2TableOwner = 
UserGroupInformation.getCurrentUser.getShortUserName
+
+  protected def withV2Table(table: String)(f: String => Unit): Unit = {
+    val tableId = s"$catalogV2.$namespace.$table"
+    try {
+      f(tableId)
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $tableId")
+    }
+  }
+
+  protected def checkV2TableOwner(po: PrivilegeObject): Unit = {
+    checkTableOwner(po, defaultV2TableOwner)
+  }
+
+  protected def withV2Table(table1: String, table2: String)(f: (String, 
String) => Unit): Unit = {
+    val tableId1 = s"$catalogV2.$namespace.$table1"
+    val tableId2 = s"$catalogV2.$namespace.$table2"
+    try {
+      f(tableId1, tableId2)
+    } finally {
+      sql(s"DROP TABLE IF EXISTS $tableId1")
+      sql(s"DROP TABLE IF EXISTS $tableId2")
+    }
+  }
+
+  protected def executePlan(sql: String): QueryExecution = {
+    val parsed = spark.sessionState.sqlParser.parsePlan(sql)
+    spark.sessionState.executePlan(parsed)
+  }
+
+  override def beforeAll(): Unit = {
+    if (spark.conf.getOption(s"spark.sql.catalog.$catalogV2").isDefined) {
+      sql(s"CREATE DATABASE IF NOT EXISTS $catalogV2.$namespace")
+      sql(
+        s"CREATE TABLE IF NOT EXISTS $catalogTable (key int, value String)")
+      if (supportsPartitionGrammar) {
+        sql(
+          s"CREATE TABLE IF NOT EXISTS $catalogPartTable (key int, value 
String, dt String)" +
+            s" PARTITIONED BY (dt)")
+      }
+    }
+
+    super.beforeAll()
+  }
+
+  test("CreateTable") {
+    val table = "CreateTable"
+    withV2Table(table) { tableId =>
+      val plan = executePlan(
+        s"CREATE TABLE IF NOT EXISTS $tableId (i int)").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === CREATETABLE)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      assert(po.owner.isEmpty)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.CREATE)
+    }
+  }
+
+  test("CreateTableAsSelect") {
+    val table = "CreateTableAsSelect"
+    withV2Table(table) { tableId =>
+      val plan = executePlan(
+        s"CREATE TABLE IF NOT EXISTS $tableId AS " +
+          s"SELECT * FROM $reusedTable").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === CREATETABLE)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.size === 1)
+      val po0 = inputs.head
+      assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po0.dbname equalsIgnoreCase reusedDb)
+      assert(po0.objectName equalsIgnoreCase reusedTableShort)
+      assert(po0.columns.take(2) === Seq("key", "value"))
+      checkTableOwner(po0)
+
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      assert(po.owner.isEmpty)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.CREATE)
+    }
+  }
+
+  test("ReplaceTable") {
+    val table = "ReplaceTable"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE IF NOT EXISTS $tableId (i int)")
+      val plan = executePlan(s"REPLACE TABLE $tableId (j int)").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === CREATETABLE)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.size === 0)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      assert(po.owner.isEmpty)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.CREATE)
+    }
+  }
+
+  test("ReplaceTableAsSelect") {
+    val table = "ReplaceTableAsSelect"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE IF NOT EXISTS $tableId (i int)")
+      val plan =
+        executePlan(s"REPLACE TABLE $tableId AS SELECT * FROM 
$reusedTable").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === CREATETABLE)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.size === 1)
+      val po0 = inputs.head
+      assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po0.dbname equalsIgnoreCase reusedDb)
+      assert(po0.objectName equalsIgnoreCase reusedTableShort)
+      assert(po0.columns.take(2) === Seq("key", "value"))
+      checkTableOwner(po0)
+
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      assert(po.owner.isEmpty)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.CREATE)
+    }
+  }
+
+  // with V2WriteCommand
+
+  test("AppendData") {
+    val plan = executePlan(s"INSERT INTO $catalogTable VALUES (0, 
'a')").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.INSERT)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  test("UpdateTable") {
+    assume(isSparkV32OrGreater)
+    assume(supportsUpdateTable)
+
+    val plan = executePlan(s"UPDATE $catalogTable SET value = 'a' WHERE key = 
0").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  test("DeleteFromTable") {
+    assume(supportsDelete)
+
+    val plan = executePlan(s"DELETE FROM $catalogTable WHERE key = 0").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  test("OverwriteByExpression") {
+    val plan = executePlan(s"INSERT OVERWRITE TABLE $catalogTable VALUES (0, 
1)").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  test("OverwritePartitionsDynamic") {
+    assume(supportsPartitionGrammar)
+
+    try {
+      sql("SET spark.sql.sources.partitionOverwriteMode=dynamic")
+      val plan = executePlan(s"INSERT OVERWRITE TABLE $catalogPartTable 
PARTITION (dt)" +
+        s"VALUES (0, 1, '2022-01-01')").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === QUERY)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.size === 0)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === catalogPartTableShort)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.UPDATE)
+    } finally {
+      sql("SET spark.sql.sources.partitionOverwriteMode=static")
+    }
+  }
+
+  test("AddPartitions") {
+    assume(supportsPartitionManagement)
+    assume(isSparkV32OrGreater)
+
+    val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+      s"ADD PARTITION (dt='2022-01-01')").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === ALTERTABLE_ADDPARTS)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.OTHER)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogPartTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.ALTER)
+  }
+
+  test("DropPartitions") {
+    assume(supportsPartitionManagement)
+    assume(isSparkV32OrGreater)
+
+    val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+      s"DROP PARTITION (dt='2022-01-01')").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === ALTERTABLE_DROPPARTS)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.OTHER)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogPartTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.ALTER)
+  }
+
+  test("RenamePartitions") {
+    assume(supportsPartitionManagement)
+    assume(isSparkV32OrGreater)
+
+    val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+      s"PARTITION (dt='2022-01-01') RENAME TO PARTITION 
(dt='2022-01-02')").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === ALTERTABLE_ADDPARTS)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.OTHER)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogPartTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.ALTER)
+  }
+
+  test("TruncatePartition") {
+    assume(supportsPartitionManagement)
+    assume(isSparkV32OrGreater)
+
+    val plan = executePlan(s"ALTER TABLE $catalogPartTable " +
+      s"PARTITION (dt='2022-01-01') RENAME TO PARTITION 
(dt='2022-01-02')").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === ALTERTABLE_DROPPARTS)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.size === 0)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.OTHER)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogPartTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.ALTER)
+  }
+
+  // other table commands
+
+  test("CommentOnTable") {
+    val plan = executePlan(s"COMMENT ON TABLE $catalogTable IS 
'text'").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === ALTERTABLE_PROPERTIES)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.isEmpty)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.OTHER)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.ALTER)
+  }
+
+  test("DropTable") {
+    val table = "DropTable"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (i int)")
+      val plan = executePlan(s"DROP TABLE $tableId").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === DROPTABLE)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.DROP)
+    }
+  }
+
+  test("MergeIntoTable") {
+    assume(supportsMergeIntoTable)
+
+    val table = "MergeIntoTable"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (key int, value String)")
+      val plan = executePlan(s"MERGE INTO $tableId t " +
+        s"USING (SELECT * FROM $catalogTable) s " +
+        s"ON t.key = s.key " +
+        s"WHEN MATCHED THEN UPDATE SET t.value = s.value " +
+        s"WHEN NOT MATCHED THEN INSERT *").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === QUERY)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.size == 1)
+      val po0 = inputs.head
+      assert(po0.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po0.dbname === namespace)
+      assert(po0.objectName === catalogTableShort)
+      assert(po0.columns === Seq("key", "value"))
+      checkV2TableOwner(po0)
+
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.UPDATE)
+    }
+  }
+
+  test("RepairTable") {
+    assume(supportsPartitionGrammar)
+    assume(isSparkV32OrGreater)
+
+    val plan = executePlan(s"MSCK REPAIR TABLE $catalogPartTable").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === ALTERTABLE_ADDPARTS)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.isEmpty)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.OTHER)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogPartTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.ALTER)
+  }
+
+  test("TruncateTable") {
+    assume(isSparkV32OrGreater)
+
+    val plan = executePlan(s"TRUNCATE TABLE $catalogTable").analyzed
+    assert(v2Commands.accept(plan.nodeName))
+    val operationType = OperationType(plan.nodeName)
+    assert(operationType === QUERY)
+
+    val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+    assert(inputs.isEmpty)
+    assert(outputs.size === 1)
+    val po = outputs.head
+    assert(po.actionType === PrivilegeObjectActionType.UPDATE)
+    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+    assert(po.dbname === namespace)
+    assert(po.objectName === catalogTableShort)
+    assert(po.columns.isEmpty)
+    checkV2TableOwner(po)
+    val accessType = AccessType(po, operationType, isInput = false)
+    assert(accessType === AccessType.UPDATE)
+  }
+
+  // with V2AlterTableCommand
+
+  test("AddColumns") {
+    assume(isSparkV32OrGreater)
+
+    val table = "AddColumns"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (i int)")
+      val plan = executePlan(s"ALTER TABLE $tableId ADD COLUMNS (j 
int)").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === ALTERTABLE_ADDCOLS)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.ALTER)
+    }
+  }
+
+  test("AlterColumn") {
+    assume(isSparkV32OrGreater)
+
+    val table = "AlterColumn"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (i int)")
+      val plan = executePlan(s"ALTER TABLE $tableId ALTER COLUMN i TYPE 
int").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === ALTERTABLE_ADDCOLS)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.ALTER)
+    }
+  }
+
+  test("DropColumns") {
+    assume(isSparkV32OrGreater)
+
+    val table = "DropColumns"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (i int, j int)")
+      val plan = executePlan(s"ALTER TABLE $tableId DROP COLUMN i").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === ALTERTABLE_ADDCOLS)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.ALTER)
+    }
+  }
+
+  test("ReplaceColumns") {
+    assume(isSparkV32OrGreater)
+
+    val table = "ReplaceColumns"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (i int, j int)")
+      val plan = executePlan(s"ALTER TABLE $tableId REPLACE COLUMNS (i 
String)").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === ALTERTABLE_REPLACECOLS)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.ALTER)
+    }
+  }
+
+  test("RenameColumn") {
+    assume(isSparkV32OrGreater)
+
+    val table = "RenameColumn"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE $tableId (i int, j int)")
+      val plan = executePlan(s"ALTER TABLE $tableId RENAME COLUMN i TO 
k").analyzed
+      assert(v2Commands.accept(plan.nodeName))
+      val operationType = OperationType(plan.nodeName)
+      assert(operationType === ALTERTABLE_RENAMECOL)
+
+      val (inputs, outputs) = PrivilegesBuilder.build(plan, spark)
+      assert(inputs.isEmpty)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.dbname === namespace)
+      assert(po.objectName === table)
+      assert(po.columns.isEmpty)
+      checkV2TableOwner(po)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.ALTER)
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2JdbcTableCatalogPrivilegesBuilderSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2JdbcTableCatalogPrivilegesBuilderSuite.scala
new file mode 100644
index 000000000..7ffaf29db
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2JdbcTableCatalogPrivilegesBuilderSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kyuubi.plugin.spark.authz
+
+import java.sql.DriverManager
+
+import scala.util.Try
+
+import org.scalatest.Outcome
+
+class V2JdbcTableCatalogPrivilegesBuilderSuite extends 
V2CommandsPrivilegesSuite {
+  override protected val catalogImpl: String = "in-memory"
+
+  override protected val supportsUpdateTable = true
+  override protected val supportsMergeIntoTable = true
+  override protected val supportsDelete = true
+  override protected val supportsPartitionGrammar = false
+  override protected val supportsPartitionManagement = false
+
+  val dbUrl = s"jdbc:derby:memory:$catalogV2"
+  val jdbcUrl: String = s"$dbUrl;create=true"
+
+  override def beforeAll(): Unit = {
+    if (isSparkV31OrGreater) {
+      spark.conf.set(
+        s"spark.sql.catalog.$catalogV2",
+        "org.apache.spark.sql.execution.datasources.v2.jdbc.JDBCTableCatalog")
+      spark.conf.set(s"spark.sql.catalog.$catalogV2.url", jdbcUrl)
+      spark.conf.set(
+        s"spark.sql.catalog.$catalogV2.driver",
+        "org.apache.derby.jdbc.AutoloadedDriver")
+    }
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+
+    // cleanup db
+    Try {
+      DriverManager.getConnection(s"$dbUrl;shutdown=true")
+    }
+  }
+
+  override def withFixture(test: NoArgTest): Outcome = {
+    assume(isSparkV31OrGreater)
+    test()
+  }
+}

Reply via email to