This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new f67140e65 [KYUUBI #5594][AUTHZ] BuildQuery should respect normal 
node's input
f67140e65 is described below

commit f67140e650d4fb335b43a65eb2678192c9b0b29a
Author: Angerszhuuuu <[email protected]>
AuthorDate: Sun Feb 4 16:39:55 2024 +0800

    [KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #5594
    
    ## Describe Your Solution ๐Ÿ”ง
    
    For case
    ```
    def filter_func(iterator):
                    for pdf in iterator:
                        yield pdf[pdf.id == 1]
    
    df = spark.read.table("test_mapinpandas")
    execute_result = df.mapInPandas(filter_func, df.schema).show()
    ```
    
    The logical plan is
    ```
    GlobalLimit 21
    +- LocalLimit 21
       +- Project [cast(id#5 as string) AS id#11, name#6]
          +- MapInPandas filter_func(id#0, name#1), [id#5, name#6]
             +- HiveTableRelation [`default`.`test_mapinpandas`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, Data Cols: [id#0, name#1], 
Partition Cols: []]
    ```
    When handle `MapInPandas`, we didn't match its  input with 
`HiveTableRelation`, cause we miss input table's  columns. This pr fix this
    
    In this pr, we remove the branch of each project such as `Project`, 
`Aggregate` etc, handle it together.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    For case
    ```
    def filter_func(iterator):
                    for pdf in iterator:
                        yield pdf[pdf.id == 1]
    
    df = spark.read.table("test_mapinpandas")
    execute_result = df.mapInPandas(filter_func, df.schema).show()
    ```
    
    We miss column info of table `test_mapinpandas`
    
    #### Behavior With This Pull Request :tada:
    We got privilege object  of table `test_mapinpandas` with it's column info.
    
    #### Related Unit Tests
    
    ---
    
    # Checklists
    ## ๐Ÿ“ Author Self Checklist
    
    - [x] My code follows the [style 
guidelines](https://kyuubi.readthedocs.io/en/master/contributing/code/style.html)
 of this project
    - [x] I have performed a self-review
    - [x] I have commented my code, particularly in hard-to-understand areas
    - [x] I have made corresponding changes to the documentation
    - [x] My changes generate no new warnings
    - [x] I have added tests that prove my fix is effective or that my feature 
works
    - [x] New and existing unit tests pass locally with my changes
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    ## ๐Ÿ“ Committer Pre-Merge Checklist
    
    - [x] Pull request title is okay.
    - [x] No license issues.
    - [x] Milestone correctly set?
    - [x] Test coverage is ok
    - [x] Assignees are selected.
    - [x] Minimum number of approvals
    - [x] No changes are requested
    
    **Be nice. Be informative.**
    
    Closes #5787 from AngersZhuuuu/KYUUBI-5594-approach2.
    
    Closes #5594
    
    e08545599 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    49f09fb0a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    4781f75b9 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
    9e9208d38 [Angerszhuuuu] Update 
V2JdbcTableCatalogRangerSparkExtensionSuite.scala
    626d3dd88 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    3d69997de [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
    6eb4b8e1a [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    61efb8ae3 [Angerszhuuuu] update
    794ebb7be [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
    a236da86b [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
    74bd3f4d5 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    4acbc4276 [Angerszhuuuu] Merge branch 'KYUUBI-5594-approach2' of 
https://github.com/AngersZhuuuu/incubator-kyuubi into KYUUBI-5594-approach2
    266f7e877 [Angerszhuuuu] update
    a6c784546 [Angerszhuuuu] Update PrivilegesBuilder.scala
    d785d5fdf [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
    014ef3b84 [Angerszhuuuu] Update PrivilegesBuilder.scala
    7e1cd37a1 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
    71d266162 [Angerszhuuuu] update
    db9594170 [Angerszhuuuu] update
    490eb95c2 [Angerszhuuuu] update
    70d110e89 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594-approach2
    e6a587718 [Angerszhuuuu] Update PrivilegesBuilder.scala
    5ff22b103 [Angerszhuuuu] Update PrivilegesBuilder.scala
    e6843014b [Angerszhuuuu] Update PrivilegesBuilder.scala
    594b202f7 [Angerszhuuuu] Update PrivilegesBuilder.scala
    2f87c61e1 [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    1de8c1c68 [Angerszhuuuu] Update PrivilegesBuilder.scala
    ad17255d7 [Angerszhuuuu] Update PrivilegesBuilderSuite.scala
    4f5e8505f [Angerszhuuuu] update
    64349ed97 [Angerszhuuuu] Update PrivilegesBuilder.scala
    11b7a4c13 [Angerszhuuuu] Update PrivilegesBuilder.scala
    9a58fb0c4 [Angerszhuuuu] update
    d0b022ec9 [Angerszhuuuu] Update RuleApplyPermanentViewMarker.scala
    e0f28a640 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
    0ebdd5de5 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
    8e53236ac [Angerszhuuuu] update
    3bafa7ca5 [Angerszhuuuu] update
    d6e984e07 [Angerszhuuuu] update
    b00bf5e20 [Angerszhuuuu] Update PrivilegesBuilder.scala
    821422852 [Angerszhuuuu] update
    93fc6892b [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
    04184e39d [Angerszhuuuu] update
    0bb762467 [Angerszhuuuu] Revert "Revert "Update PrivilegesBuilder.scala""
    f481283ae [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala"
    9f871822f [Angerszhuuuu] Revert "Update PrivilegesBuilder.scala"
    29b67c457 [Angerszhuuuu] Update PrivilegesBuilder.scala
    8785ad1ab [Angerszhuuuu] Update PrivilegesBuilder.scala
    270f21dcc [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    60872efcb [Angerszhuuuu] Update RangerSparkExtensionSuite.scala
    c34f32ea2 [Angerszhuuuu] Merge branch 'master' into KYUUBI-5594
    86fc4756a [Angerszhuuuu] Update PrivilegesBuilder.scala
    404f1ea4c [Angerszhuuuu] Update PrivilegesBuilder.scala
    dcca394e0 [Angerszhuuuu] Update PrivilegesBuilder.scala
    c2c6fa447 [Angerszhuuuu] Update PrivilegesBuilder.scala
    6f6a36e5b [Angerszhuuuu] Merge branch 'master' into 
KYUUBI-5594]-AUTH]BuildQuery-should-respect-normal-node's-input
    4dd47a124 [Angerszhuuuu] update
    c549b6a1a [Angerszhuuuu] update
    80013b981 [Angerszhuuuu] Update PrivilegesBuilder.scala
    3cbba422a [Angerszhuuuu] Update PrivilegesBuilder.scala
    
    Authored-by: Angerszhuuuu <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../plugin/spark/authz/PrivilegesBuilder.scala     |  93 +++++++-----
 .../spark/authz/PrivilegesBuilderSuite.scala       |  44 +++---
 .../spark/authz/V2CommandsPrivilegesSuite.scala    |   4 +-
 .../DeltaCatalogRangerSparkExtensionSuite.scala    |  33 +++--
 .../HudiCatalogRangerSparkExtensionSuite.scala     |   2 +-
 .../IcebergCatalogRangerSparkExtensionSuite.scala  | 126 +++++++++-------
 .../authz/ranger/RangerSparkExtensionSuite.scala   | 163 +++++++++++----------
 ...JdbcTableCatalogRangerSparkExtensionSuite.scala | 155 ++++++++++----------
 8 files changed, 337 insertions(+), 283 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index 2d452ba9d..01266eb2c 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.kyuubi.plugin.spark.authz
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.slf4j.LoggerFactory
@@ -69,44 +69,20 @@ object PrivilegesBuilder {
       if (projectionList.isEmpty) {
         privilegeObjects += PrivilegeObject(table, plan.output.map(_.name))
       } else {
-        val cols = (projectionList ++ conditionList).flatMap(collectLeaves)
-          .filter(plan.outputSet.contains).map(_.name).distinct
-        privilegeObjects += PrivilegeObject(table, cols)
+        val cols = columnPrune(projectionList ++ conditionList, plan.outputSet)
+        privilegeObjects += PrivilegeObject(table, cols.map(_.name).distinct)
       }
     }
 
+    def columnPrune(projectionList: Seq[Expression], output: AttributeSet): 
Seq[NamedExpression] = {
+      (projectionList ++ conditionList)
+        .flatMap(collectLeaves)
+        .filter(output.contains)
+    }
+
     plan match {
       case p if p.getTagValue(KYUUBI_AUTHZ_TAG).nonEmpty =>
 
-      case p: Project => buildQuery(p.child, privilegeObjects, p.projectList, 
conditionList, spark)
-
-      case j: Join =>
-        val cols =
-          conditionList ++ j.condition.map(expr => 
collectLeaves(expr)).getOrElse(Nil)
-        buildQuery(j.left, privilegeObjects, projectionList, cols, spark)
-        buildQuery(j.right, privilegeObjects, projectionList, cols, spark)
-
-      case f: Filter =>
-        val cols = conditionList ++ collectLeaves(f.condition)
-        buildQuery(f.child, privilegeObjects, projectionList, cols, spark)
-
-      case w: Window =>
-        val orderCols = w.orderSpec.flatMap(orderSpec => 
collectLeaves(orderSpec))
-        val partitionCols = w.partitionSpec.flatMap(partitionSpec => 
collectLeaves(partitionSpec))
-        val cols = conditionList ++ orderCols ++ partitionCols
-        buildQuery(w.child, privilegeObjects, projectionList, cols, spark)
-
-      case s: Sort =>
-        val sortCols = s.order.flatMap(sortOrder => collectLeaves(sortOrder))
-        val cols = conditionList ++ sortCols
-        buildQuery(s.child, privilegeObjects, projectionList, cols, spark)
-
-      case a: Aggregate =>
-        val aggCols =
-          (a.aggregateExpressions ++ a.groupingExpressions).flatMap(e => 
collectLeaves(e))
-        val cols = conditionList ++ aggCols
-        buildQuery(a.child, privilegeObjects, projectionList, cols, spark)
-
       case scan if isKnownScan(scan) && scan.resolved =>
         val tables = getScanSpec(scan).tables(scan, spark)
         // If the the scan is table-based, we check privileges on the table we 
found
@@ -125,7 +101,33 @@ object PrivilegesBuilder {
 
       case p =>
         for (child <- p.children) {
-          buildQuery(child, privilegeObjects, projectionList, conditionList, 
spark)
+          // If current plan's references don't have relation to it's input, 
have two cases
+          //   1. `MapInPandas`, `ScriptTransformation`
+          //   2. `Project` output only have constant value
+          if (columnPrune(p.references.toSeq ++ p.output, p.inputSet).isEmpty) 
{
+            // If plan is project and output don't have relation to input, can 
ignore.
+            if (!p.isInstanceOf[Project]) {
+              buildQuery(
+                child,
+                privilegeObjects,
+                p.inputSet.map(_.toAttribute).toSeq,
+                Nil,
+                spark)
+            }
+          } else {
+            buildQuery(
+              child,
+              privilegeObjects,
+              // Here we use `projectList ++ p.reference` do column prune.
+              // For `Project`, `Aggregate`, plan's output is contained by 
plan's referenced
+              // For `Filter`, `Sort` etc... it rely on upper `Project` node,
+              //    since we wrap a `Project` before call `buildQuery()`.
+              // So here we use upper node's projectionList and current's 
references
+              // to do column pruning can get the correct column.
+              columnPrune(projectionList ++ p.references.toSeq, 
p.inputSet).distinct,
+              conditionList ++ p.references,
+              spark)
+          }
         }
     }
   }
@@ -221,7 +223,26 @@ object PrivilegesBuilder {
               LOG.debug(ud.error(plan, e))
           }
         }
-        spec.queries(plan).foreach(buildQuery(_, inputObjs, spark = spark))
+        spec.queries(plan).foreach { p =>
+          if (p.resolved) {
+            buildQuery(Project(p.output, p), inputObjs, spark = spark)
+          } else {
+            try {
+              // For spark 3.1, Some command such as CreateTableASSelect, its 
query was unresolved,
+              // Before this pr, we just ignore it, now we support this.
+              val analyzed = spark.sessionState.analyzer.execute(p)
+              buildQuery(Project(analyzed.output, analyzed), inputObjs, spark 
= spark)
+            } catch {
+              case e: Exception =>
+                LOG.debug(
+                  s"""
+                     |Failed to analyze unresolved
+                     |$p
+                     |due to ${e.getMessage}""".stripMargin,
+                  e)
+            }
+          }
+        }
         spec.operationType
 
       case classname if FUNCTION_COMMAND_SPECS.contains(classname) =>
@@ -315,7 +336,7 @@ object PrivilegesBuilder {
       case cmd: Command => buildCommand(cmd, inputObjs, outputObjs, spark)
       // Queries
       case _ =>
-        buildQuery(plan, inputObjs, spark = spark)
+        buildQuery(Project(plan.output, plan), inputObjs, spark = spark)
         OperationType.QUERY
     }
     (inputObjs, outputObjs, opType)
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
index 673a2e437..d8b672a56 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala
@@ -59,11 +59,15 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
   protected def checkColumns(plan: LogicalPlan, cols: Seq[String]): Unit = {
     val (in, out, _) = PrivilegesBuilder.build(plan, spark)
     assert(out.isEmpty, "Queries shall not check output privileges")
-    val po = in.head
-    assert(po.actionType === PrivilegeObjectActionType.OTHER)
-    assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
-    assert(po.columns === cols)
-    checkTableOwner(po)
+    if (in.nonEmpty) {
+      val po = in.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assert(po.columns === cols)
+      checkTableOwner(po)
+    } else {
+      assert(cols.isEmpty)
+    }
   }
 
   protected def checkColumns(query: String, cols: Seq[String]): Unit = {
@@ -365,7 +369,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
     assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
     if (isSparkV32OrGreater) {
       // Query in AlterViewAsCommand can not be resolved before SPARK-34698
-      assert(po0.columns === Seq("key", "value", "pid"))
+      assert(po0.columns === Seq("key", "pid", "value"))
       checkTableOwner(po0)
     }
     val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
@@ -526,12 +530,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
     assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
     assertEqualsIgnoreCase(reusedDb)(po0.dbname)
     assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
-    if (isSparkV32OrGreater) {
-      assert(po0.columns.head === "key")
-      checkTableOwner(po0)
-    } else {
-      assert(po0.columns.isEmpty)
-    }
+    assert(po0.columns.head === "key")
+    checkTableOwner(po0)
     val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
     assert(accessType0 === AccessType.SELECT)
 
@@ -549,12 +549,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
     assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
     assertEqualsIgnoreCase(reusedDb)(po0.dbname)
     assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
-    if (isSparkV32OrGreater) {
-      assert(po0.columns === Seq("key", "value"))
-      checkTableOwner(po0)
-    } else {
-      assert(po0.columns.isEmpty)
-    }
+    assert(po0.columns === Seq("key", "value"))
+    checkTableOwner(po0)
     val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
     assert(accessType0 === AccessType.SELECT)
 
@@ -1050,7 +1046,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
       assertEqualsIgnoreCase(reusedDb)(po.dbname)
       assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
       assert(
-        po.columns === Seq("value", "pid", "key"),
+        po.columns === Seq("value", "key", "pid"),
         s"$reusedPartTable both 'key', 'value' and 'pid' should be 
authenticated")
       checkTableOwner(po)
       val accessType = ranger.AccessType(po, operationType, isInput = true)
@@ -1107,7 +1103,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
       assertEqualsIgnoreCase(reusedDb)(po.dbname)
       assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
       assert(
-        po.columns === Seq("key", "value"),
+        po.columns.sorted === Seq("key", "value").sorted,
         s"$reusedPartTable 'key' is the join key and 'pid' is omitted")
       checkTableOwner(po)
       val accessType = ranger.AccessType(po, operationType, isInput = true)
@@ -1218,7 +1214,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
       assertEqualsIgnoreCase(reusedDb)(po.dbname)
       assertStartsWithIgnoreCase(reusedTableShort)(po.objectName)
       assert(
-        po.columns === Seq("key", "value", "pid"),
+        po.columns === Seq("key", "pid", "value"),
         s"$reusedPartTable both 'key', 'value' and 'pid' should be 
authenticated")
       checkTableOwner(po)
       val accessType = ranger.AccessType(po, operationType, isInput = true)
@@ -1625,7 +1621,7 @@ class HiveCatalogPrivilegeBuilderSuite extends 
PrivilegesBuilderSuite {
     assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
     assertEqualsIgnoreCase(reusedDb)(po0.dbname)
     assert(po0.objectName equalsIgnoreCase reusedPartTable.split("\\.").last)
-    assert(po0.columns === Seq("key", "value", "pid"))
+    assert(po0.columns === Seq("key", "pid", "value"))
     checkTableOwner(po0)
     val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
     assert(accessType0 === AccessType.SELECT)
@@ -1721,7 +1717,7 @@ class HiveCatalogPrivilegeBuilderSuite extends 
PrivilegesBuilderSuite {
     assert(out1.isEmpty)
     val pi1 = in1.head
     assert(pi1.columns.size === 3)
-    assert(pi1.columns === Seq("key", "value", "pid"))
+    assert(pi1.columns === Seq("key", "pid", "value"))
 
     // case2: Some columns are involved, and the group column is not selected.
     val plan2 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid")
@@ -1741,7 +1737,7 @@ class HiveCatalogPrivilegeBuilderSuite extends 
PrivilegesBuilderSuite {
     assert(out3.isEmpty)
     val pi3 = in3.head
     assert(pi3.columns.size === 2)
-    assert(pi3.columns === Seq("key", "pid"))
+    assert(pi3.columns === Seq("pid", "key"))
 
     // case4: HAVING & GROUP clause
     val plan4 = sql(s"SELECT COUNT(key) FROM $reusedPartTable GROUP BY pid 
HAVING MAX(key) > 1000")
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
index 62b7939b3..1b6e07b77 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/V2CommandsPrivilegesSuite.scala
@@ -127,7 +127,7 @@ abstract class V2CommandsPrivilegesSuite extends 
PrivilegesBuilderSuite {
       assert(po0.catalog.isEmpty)
       assertEqualsIgnoreCase(reusedDb)(po0.dbname)
       assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
-      assert(po0.columns.take(2) === Seq("key", "value"))
+      assert(po0.columns === Seq("a", "key", "value"))
       checkTableOwner(po0)
 
       assert(outputs.size === 1)
@@ -186,7 +186,7 @@ abstract class V2CommandsPrivilegesSuite extends 
PrivilegesBuilderSuite {
       assert(po0.catalog.isEmpty)
       assertEqualsIgnoreCase(reusedDb)(po0.dbname)
       assertEqualsIgnoreCase(reusedTableShort)(po0.objectName)
-      assert(po0.columns.take(2) === Seq("key", "value"))
+      assert(po0.columns === Seq("a", "key", "value"))
       checkTableOwner(po0)
 
       assert(outputs.size === 1)
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 1ce8ad676..dbf88d7d0 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -215,8 +215,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
           s" SELECT * FROM $namespace1.$table2"
         interceptEndsWith[AccessControlException](
           doAs(someone, sql(insertIntoSql)))(
-          s"does not have [select] privilege on 
[$namespace1/$table2/id,$namespace1/$table2/name," +
-            s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+          s"does not have [select] privilege on " +
+            s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
+            s"$namespace1/$table2/id,$namespace1/$table2/name]," +
             s" [update] privilege on [$namespace1/$table1]")
         doAs(admin, sql(insertIntoSql))
 
@@ -225,8 +226,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
           s" SELECT * FROM $namespace1.$table2"
         interceptEndsWith[AccessControlException](
           doAs(someone, sql(insertOverwriteSql)))(
-          s"does not have [select] privilege on 
[$namespace1/$table2/id,$namespace1/$table2/name," +
-            s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+          s"does not have [select] privilege on " +
+            s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
+            s"$namespace1/$table2/id,$namespace1/$table2/name]," +
             s" [update] privilege on [$namespace1/$table1]")
         doAs(admin, sql(insertOverwriteSql))
       }
@@ -283,8 +285,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
              |""".stripMargin
         interceptEndsWith[AccessControlException](
           doAs(someone, sql(mergeIntoSql)))(
-          s"does not have [select] privilege on 
[$namespace1/$table2/id,$namespace1/$table2/name," +
-            s"$namespace1/$table2/gender,$namespace1/$table2/birthDate]," +
+          s"does not have [select] privilege on " +
+            s"[$namespace1/$table2/birthDate,$namespace1/$table2/gender," +
+            s"$namespace1/$table2/id,$namespace1/$table2/name]," +
             s" [update] privilege on [$namespace1/$table1]")
         doAs(admin, sql(mergeIntoSql))
       }
@@ -378,9 +381,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
           val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM 
$namespace1.$table2"
           interceptEndsWith[AccessControlException](
             doAs(someone, sql(insertIntoSql)))(
-            s"does not have [select] privilege on [$namespace1/$table2/id," +
-              s"$namespace1/$table2/name,$namespace1/$table2/gender," +
-              s"$namespace1/$table2/birthDate], [write] privilege on [[$path, 
$path/]]")
+            s"does not have [select] privilege on 
[$namespace1/$table2/birthDate," +
+              s"$namespace1/$table2/gender,$namespace1/$table2/id," +
+              s"$namespace1/$table2/name], [write] privilege on [[$path, 
$path/]]")
           doAs(admin, sql(insertIntoSql))
 
           // insert overwrite
@@ -388,9 +391,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
             s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
           interceptEndsWith[AccessControlException](
             doAs(someone, sql(insertOverwriteSql)))(
-            s"does not have [select] privilege on [$namespace1/$table2/id," +
-              s"$namespace1/$table2/name,$namespace1/$table2/gender," +
-              s"$namespace1/$table2/birthDate], [write] privilege on [[$path, 
$path/]]")
+            s"does not have [select] privilege on 
[$namespace1/$table2/birthDate," +
+              s"$namespace1/$table2/gender,$namespace1/$table2/id," +
+              s"$namespace1/$table2/name], [write] privilege on [[$path, 
$path/]]")
           doAs(admin, sql(insertOverwriteSql))
         })
       }
@@ -433,9 +436,9 @@ class DeltaCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
                |""".stripMargin
           interceptEndsWith[AccessControlException](
             doAs(someone, sql(mergeIntoSql)))(
-            s"does not have [select] privilege on [$namespace1/$table2/id," +
-              s"$namespace1/$table2/name,$namespace1/$table2/gender," +
-              s"$namespace1/$table2/birthDate], [write] privilege on [[$path, 
$path/]]")
+            s"does not have [select] privilege on 
[$namespace1/$table2/birthDate," +
+              s"$namespace1/$table2/gender,$namespace1/$table2/id," +
+              s"$namespace1/$table2/name], [write] privilege on [[$path, 
$path/]]")
           doAs(admin, sql(mergeIntoSql))
         })
       }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
index a7c0c7662..72d4130ef 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/HudiCatalogRangerSparkExtensionSuite.scala
@@ -507,7 +507,7 @@ class HudiCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
         interceptEndsWith[AccessControlException] {
           doAs(someone, sql(mergeIntoSQL))
         }(s"does not have [select] privilege on " +
-          
s"[$namespace1/$table2/id,$namespace1/$table2/name,$namespace1/$table2/city], " 
+
+          
s"[$namespace1/$table2/city,$namespace1/$table2/id,$namespace1/$table2/name], " 
+
           s"[update] privilege on [$namespace1/$table1]")
         doAs(admin, sql(mergeIntoSQL))
       }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index 677b3945d..cf798cdfa 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -93,73 +93,87 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
   }
 
   test("[KYUUBI #3515] MERGE INTO") {
-    val mergeIntoSql =
-      s"""
-         |MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target
-         |USING $catalogV2.$namespace1.$table1  AS source
-         |ON target.id = source.id
-         |WHEN MATCHED AND (target.name='delete') THEN DELETE
-         |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city 
= source.city
+    withSingleCallEnabled {
+      val mergeIntoSql =
+        s"""
+           |MERGE INTO $catalogV2.$bobNamespace.$bobSelectTable AS target
+           |USING $catalogV2.$namespace1.$table1  AS source
+           |ON target.id = source.id
+           |WHEN MATCHED AND (target.name='delete') THEN DELETE
+           |WHEN MATCHED AND (target.name='update') THEN UPDATE SET 
target.city = source.city
       """.stripMargin
 
-    // MergeIntoTable:  Using a MERGE INTO Statement
-    val e1 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(mergeIntoSql)))
-    assert(e1.getMessage.contains(s"does not have [select] privilege" +
-      s" on [$namespace1/$table1/id]"))
-
-    withSingleCallEnabled {
-      interceptEndsWith[AccessControlException](doAs(someone, 
sql(mergeIntoSql)))(
-        if (isSparkV35OrGreater) {
-          s"does not have [select] privilege on [$namespace1/table1/id" +
-            s",$namespace1/$table1/name,$namespace1/$table1/city]"
-        } else {
-          "does not have " +
-            s"[select] privilege on 
[$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city]," +
-            s" [update] privilege on [$bobNamespace/$bobSelectTable]"
-        })
+      // MergeIntoTable:  Using a MERGE INTO Statement
+      val e1 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(mergeIntoSql)))
+      assert(e1.getMessage.contains(s"does not have [select] privilege" +
+        s" on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+
+      withSingleCallEnabled {
+        interceptEndsWith[AccessControlException](doAs(someone, 
sql(mergeIntoSql)))(
+          if (isSparkV35OrGreater) {
+            s"does not have [select] privilege on [$namespace1/table1/city" +
+              s",$namespace1/$table1/id,$namespace1/$table1/name]"
+          } else {
+            "does not have " +
+              s"[select] privilege on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]," +
+              s" [update] privilege on [$bobNamespace/$bobSelectTable]"
+          })
+
+        interceptEndsWith[AccessControlException] {
+          doAs(bob, sql(mergeIntoSql))
+        }(s"does not have [update] privilege on 
[$bobNamespace/$bobSelectTable]")
+      }
 
-      interceptEndsWith[AccessControlException] {
-        doAs(bob, sql(mergeIntoSql))
-      }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
+      doAs(admin, sql(mergeIntoSql))
     }
-
-    doAs(admin, sql(mergeIntoSql))
   }
 
   test("[KYUUBI #3515] UPDATE TABLE") {
-    // UpdateTable
-    interceptEndsWith[AccessControlException] {
-      doAs(someone, sql(s"UPDATE $catalogV2.$namespace1.$table1 SET 
city='Guangzhou'  WHERE id=1"))
-    }(if (isSparkV35OrGreater) {
-      s"does not have [select] privilege on [$namespace1/$table1/id]"
-    } else {
-      s"does not have [update] privilege on [$namespace1/$table1]"
-    })
+    withSingleCallEnabled {
+      // UpdateTable
+      interceptEndsWith[AccessControlException] {
+        doAs(
+          someone,
+          sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou'  
WHERE id=1"))
+      }(if (isSparkV35OrGreater) {
+        s"does not have [select] privilege on " +
+          s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," +
+          
s"$namespace1/$table1/id,$namespace1/$table1/name,$namespace1/$table1/city], " +
+          s"[update] privilege on [$namespace1/$table1]"
+      } else {
+        s"does not have [update] privilege on [$namespace1/$table1]"
+      })
 
-    doAs(
-      admin,
-      sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
-        " WHERE id=1"))
+      doAs(
+        admin,
+        sql(s"UPDATE $catalogV2.$namespace1.$table1 SET city='Guangzhou' " +
+          " WHERE id=1"))
+    }
   }
 
   test("[KYUUBI #3515] DELETE FROM TABLE") {
-    // DeleteFromTable
-    interceptEndsWith[AccessControlException] {
-      doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE 
id=2"))
-    }(if (isSparkV34OrGreater) {
-      s"does not have [select] privilege on [$namespace1/$table1/id]"
-    } else {
-      s"does not have [update] privilege on [$namespace1/$table1]"
-    })
-
-    interceptEndsWith[AccessControlException] {
-      doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable 
WHERE id=2"))
-    }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
-
-    doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE id=2"))
+    withSingleCallEnabled {
+      // DeleteFromTable
+      interceptEndsWith[AccessControlException] {
+        doAs(someone, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE 
id=2"))
+      }(if (isSparkV34OrGreater) {
+        s"does not have [select] privilege on " +
+          s"[$namespace1/$table1/_file,$namespace1/$table1/_pos," +
+          
s"$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name], " +
+          s"[update] privilege on [$namespace1/$table1]"
+      } else {
+        s"does not have [update] privilege on [$namespace1/$table1]"
+      })
+
+      interceptEndsWith[AccessControlException] {
+        doAs(bob, sql(s"DELETE FROM $catalogV2.$bobNamespace.$bobSelectTable 
WHERE id=2"))
+      }(s"does not have [update] privilege on [$bobNamespace/$bobSelectTable]")
+
+      doAs(admin, sql(s"DELETE FROM $catalogV2.$namespace1.$table1 WHERE 
id=2"))
+    }
   }
 
   test("[KYUUBI #3666] Support {OWNER} variable for queries run on CatalogV2") 
{
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
index 43333ea77..6feb63eb9 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala
@@ -23,12 +23,15 @@ import java.nio.file.Path
 import scala.util.Try
 
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.spark.sql.{Row, SparkSessionExtensions}
+import org.apache.spark.sql.{DataFrame, Row, SparkSessionExtensions}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
 import org.apache.spark.sql.catalyst.plans.logical.Statistics
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.scalatest.BeforeAndAfterAll
 // scalastyle:off
 import org.scalatest.funsuite.AnyFunSuite
@@ -555,11 +558,7 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
 
       val e2 = intercept[AccessControlException](
         doAs(someone, sql(s"CREATE VIEW $permView AS SELECT * FROM $table")))
-      if (isSparkV32OrGreater) {
-        assert(e2.getMessage.contains(s"does not have [select] privilege on 
[default/$table/id]"))
-      } else {
-        assert(e2.getMessage.contains(s"does not have [select] privilege on 
[$table]"))
-      }
+      assert(e2.getMessage.contains(s"does not have [select] privilege on 
[default/$table/id]"))
     }
   }
 
@@ -638,14 +637,12 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
         s" FROM $db1.$srcTable1 as tb1" +
         s" JOIN $db1.$srcTable2 as tb2" +
         s" on tb1.id = tb2.id"
-      val e1 = intercept[AccessControlException](doAs(someone, 
sql(insertSql1)))
-      assert(e1.getMessage.contains(s"does not have [select] privilege on 
[$db1/$srcTable1/id]"))
 
       withSingleCallEnabled {
-        val e2 = intercept[AccessControlException](doAs(someone, 
sql(insertSql1)))
-        assert(e2.getMessage.contains(s"does not have" +
+        val e = intercept[AccessControlException](doAs(someone, 
sql(insertSql1)))
+        assert(e.getMessage.contains(s"does not have" +
           s" [select] privilege on" +
-          s" [$db1/$srcTable1/id,$db1/$srcTable1/name,$db1/$srcTable1/city," +
+          s" [$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name," +
           s"$db1/$srcTable2/age,$db1/$srcTable2/id]," +
           s" [update] privilege on [$db1/$sinkTable1/id,$db1/$sinkTable1/age," 
+
           s"$db1/$sinkTable1/name,$db1/$sinkTable1/city]"))
@@ -675,11 +672,13 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
           sql(s"CREATE TABLE IF NOT EXISTS $db1.$srcTable1" +
             s" (id int, name string, city string)"))
 
-        val e1 = intercept[AccessControlException](
-          doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from 
$db1.$srcTable1")))
-        assert(
-          e1.getMessage.contains(s"does not have [select] privilege on 
[$db1/$srcTable1/id]"))
-
+        withSingleCallEnabled {
+          val e1 = intercept[AccessControlException](
+            doAs(someone, sql(s"CACHE TABLE $cacheTable2 select * from 
$db1.$srcTable1")))
+          assert(
+            e1.getMessage.contains(s"does not have [select] privilege on " +
+              
s"[$db1/$srcTable1/city,$db1/$srcTable1/id,$db1/$srcTable1/name]"))
+        }
         doAs(admin, sql(s"CACHE TABLE $cacheTable3 SELECT 1 AS a, 2 AS b "))
         doAs(someone, sql(s"CACHE TABLE $cacheTable4 select 1 as a, 2 as b "))
       }
@@ -888,9 +887,15 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
           doAs(
             someone,
             sql(s"SELECT id as new_id, name, max_scope FROM 
$db1.$view1".stripMargin).show()))
-        assert(e2.getMessage.contains(
-          s"does not have [select] privilege on " +
-            s"[$db1/$view1/id,$db1/$view1/name,$db1/$view1/max_scope]"))
+        if (isSparkV35OrGreater) {
+          assert(e2.getMessage.contains(
+            s"does not have [select] privilege on " +
+              s"[$db1/$view1/id,$db1/$view1/max_scope,$db1/$view1/name]"))
+        } else {
+          assert(e2.getMessage.contains(
+            s"does not have [select] privilege on " +
+              s"[$db1/$view1/name,$db1/$view1/id,$db1/$view1/max_scope]"))
+        }
       }
     }
   }
@@ -927,17 +932,11 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
                |AS
                |SELECT count(*) as cnt, sum(id) as sum_id FROM $db1.$table1
               """.stripMargin))
-        interceptEndsWith[AccessControlException](
-          doAs(someone, sql(s"SELECT count(*) FROM $db1.$table1").show()))(
-          s"does not have [select] privilege on 
[$db1/$table1/id,$db1/$table1/scope]")
+        checkAnswer(someone, s"SELECT count(*) FROM $db1.$table1", Row(0) :: 
Nil)
 
-        interceptEndsWith[AccessControlException](
-          doAs(someone, sql(s"SELECT count(*) FROM $db1.$view1").show()))(
-          s"does not have [select] privilege on 
[$db1/$view1/id,$db1/$view1/scope]")
+        checkAnswer(someone, s"SELECT count(*) FROM $db1.$view1", Row(0) :: 
Nil)
 
-        interceptEndsWith[AccessControlException](
-          doAs(someone, sql(s"SELECT count(*) FROM $db1.$view2").show()))(
-          s"does not have [select] privilege on 
[$db1/$view2/cnt,$db1/$view2/sum_id]")
+        checkAnswer(someone, s"SELECT count(*) FROM $db1.$view2", Row(1) :: 
Nil)
 
         interceptEndsWith[AccessControlException](
           doAs(someone, sql(s"SELECT count(id) FROM $db1.$table1 WHERE id > 
10").show()))(
@@ -1321,7 +1320,7 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
           doAs(
             someone,
             sql(s"SELECT typeof(id), typeof(typeof(day)) FROM 
$db1.$table1").collect()))(
-          s"does not have [select] privilege on 
[$db1/$table1/id,$db1/$table1/day]")
+          s"does not have [select] privilege on 
[$db1/$table1/day,$db1/$table1/id]")
         interceptEndsWith[AccessControlException](
           doAs(
             someone,
@@ -1331,7 +1330,7 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
                  |typeof(cast(id as string)),
                  |typeof(substring(day, 1, 3))
                  |FROM $db1.$table1""".stripMargin).collect()))(
-          s"does not have [select] privilege on 
[$db1/$table1/id,$db1/$table1/day]")
+          s"does not have [select] privilege on 
[$db1/$table1/day,$db1/$table1/id]")
         checkAnswer(
           admin,
           s"""
@@ -1414,60 +1413,76 @@ class HiveCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite {
     }
   }
 
-  test("[KYUUBI #5884] PVM should inherit MultiInstance and wrap with new 
exprId") {
+  test("[KYUUBI #5594][AUTHZ] BuildQuery should respect normal node's input ") 
{
+    assume(!isSparkV35OrGreater, "mapInPandas not supported after spark 3.5")
     val db1 = defaultDb
     val table1 = "table1"
-    val perm_view = "perm_view"
     val view1 = "view1"
-    val view2 = "view2"
-    val view3 = "view3"
     withSingleCallEnabled {
-      withCleanTmpResources(Seq.empty) {
-        sql("set spark.sql.legacy.storeAnalyzedPlanForView=true")
-        doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1(id int, 
scope int)"))
-        doAs(admin, sql(s"CREATE VIEW $db1.$perm_view AS SELECT * FROM 
$db1.$table1"))
-
-        doAs(
-          admin,
-          sql(
-            s"""
-               |CREATE OR REPLACE TEMPORARY VIEW $view1 AS
-               |SELECT *
-               |FROM $db1.$perm_view
-               |WHERE id > 10
-               |""".stripMargin))
+      withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", 
"view"))) {
+        doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, 
scope int)"))
+        doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM 
$db1.$table1"))
 
-        doAs(
-          admin,
-          sql(
-            s"""
-               |CREATE OR REPLACE TEMPORARY VIEW $view2 AS
-               |SELECT *
-               |FROM $view1
-               |WHERE scope <  10
-               |""".stripMargin))
+        val table = spark.read.table(s"$db1.$table1")
+        val mapTableInPandasUDF = PythonUDF(
+          "mapInPandasUDF",
+          null,
+          StructType(Seq(StructField("id", IntegerType), StructField("scope", 
IntegerType))),
+          table.queryExecution.analyzed.output,
+          205,
+          true)
+        interceptContains[AccessControlException](
+          doAs(
+            someone,
+            invokeAs(
+              table,
+              "mapInPandas",
+              (classOf[PythonUDF], mapTableInPandasUDF))
+              .asInstanceOf[DataFrame].select(col("id"), 
col("scope")).limit(1).show(true)))(
+          s"does not have [select] privilege on 
[$db1/$table1/id,$db1/$table1/scope]")
 
-        doAs(
-          admin,
-          sql(
-            s"""
-               |CREATE OR REPLACE TEMPORARY VIEW $view3 AS
-               |SELECT *
-               |FROM $view1
-               |WHERE scope is not null
-               |""".stripMargin))
+        val view = spark.read.table(s"$db1.$view1")
+        val mapViewInPandasUDF = PythonUDF(
+          "mapInPandasUDF",
+          null,
+          StructType(Seq(StructField("id", IntegerType), StructField("scope", 
IntegerType))),
+          view.queryExecution.analyzed.output,
+          205,
+          true)
+        interceptContains[AccessControlException](
+          doAs(
+            someone,
+            invokeAs(
+              view,
+              "mapInPandas",
+              (classOf[PythonUDF], mapViewInPandasUDF))
+              .asInstanceOf[DataFrame].select(col("id"), 
col("scope")).limit(1).show(true)))(
+          s"does not have [select] privilege on 
[$db1/$view1/id,$db1/$view1/scope]")
+      }
+    }
+  }
 
+  test("[KYUUBI #5594][AUTHZ] BuildQuery should respect sort agg input") {
+    val db1 = defaultDb
+    val table1 = "table1"
+    val view1 = "view1"
+    withSingleCallEnabled {
+      withCleanTmpResources(Seq((s"$db1.$table1", "table"), (s"$db1.$view1", 
"view"))) {
+        doAs(admin, sql(s"CREATE TABLE IF NOT EXISTS $db1.$table1 (id int, 
scope int)"))
+        doAs(admin, sql(s"CREATE VIEW $db1.$view1 AS SELECT * FROM 
$db1.$table1"))
+        checkAnswer(
+          someone,
+          s"SELECT count(*) FROM $db1.$table1 WHERE id > 1",
+          Row(0) :: Nil)
+        checkAnswer(
+          someone,
+          s"SELECT count(*) FROM $db1.$view1 WHERE id > 1",
+          Row(0) :: Nil)
         interceptContains[AccessControlException](
           doAs(
             someone,
-            sql(
-              s"""
-                 |SELECT a.*, b.scope as new_scope
-                 |FROM $view2 a
-                 |JOIN $view3 b
-                 |ON a.id == b.id
-                 |""".stripMargin).collect()))(s"does not have [select] 
privilege on " +
-          
s"[$db1/$perm_view/id,$db1/$perm_view/scope,$db1/$perm_view/scope,$db1/$perm_view/id]")
+            sql(s"SELECT count(id) FROM $db1.$view1 WHERE id > 1").collect()))(
+          s"does not have [select] privilege on [$db1/$view1/id]")
       }
     }
   }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
index 046052d55..3a22f45d5 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/V2JdbcTableCatalogRangerSparkExtensionSuite.scala
@@ -107,20 +107,23 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSu
   }
 
   test("[KYUUBI #3424] CREATE TABLE") {
-    // CreateTable
-    val e2 = intercept[AccessControlException](
-      doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS 
$catalogV2.$namespace1.$table2")))
-    assert(e2.getMessage.contains(s"does not have [create] privilege" +
-      s" on [$namespace1/$table2]"))
+    withSingleCallEnabled {
+      // CreateTable
+      val e2 = intercept[AccessControlException](
+        doAs(someone, sql(s"CREATE TABLE IF NOT EXISTS 
$catalogV2.$namespace1.$table2")))
+      assert(e2.getMessage.contains(s"does not have [create] privilege" +
+        s" on [$namespace1/$table2]"))
 
-    // CreateTableAsSelect
-    val e21 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
-          s" AS select * from $catalogV2.$namespace1.$table1")))
-    assert(e21.getMessage.contains(s"does not have [select] privilege" +
-      s" on [$namespace1/$table1/id]"))
+      // CreateTableAsSelect
+
+      val e21 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(s"CREATE TABLE IF NOT EXISTS $catalogV2.$namespace1.$table2" +
+            s" AS select * from $catalogV2.$namespace1.$table1")))
+      assert(e21.getMessage.contains(s"does not have [select] privilege" +
+        s" on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+    }
   }
 
   test("[KYUUBI #3424] DROP TABLE") {
@@ -133,69 +136,74 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSu
 
   test("[KYUUBI #3424] INSERT TABLE") {
     // AppendData: Insert Using a VALUES Clause
-    val e4 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
-          s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
-    assert(e4.getMessage.contains(s"does not have [update] privilege" +
-      s" on [$namespace1/$outputTable1]"))
+    withSingleCallEnabled {
 
-    // AppendData: Insert Using a TABLE Statement
-    val e42 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
-          s" TABLE $catalogV2.$namespace1.$table1")))
-    assert(e42.getMessage.contains(s"does not have [select] privilege" +
-      s" on [$namespace1/$table1/id]"))
+      val e4 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
+            s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
+      assert(e4.getMessage.contains(s"does not have [update] privilege" +
+        s" on [$namespace1/$outputTable1]"))
 
-    // AppendData: Insert Using a SELECT Statement
-    val e43 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
-          s" SELECT * from $catalogV2.$namespace1.$table1")))
-    assert(e43.getMessage.contains(s"does not have [select] privilege" +
-      s" on [$namespace1/$table1/id]"))
+      // AppendData: Insert Using a TABLE Statement
+      val e42 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
+            s" TABLE $catalogV2.$namespace1.$table1")))
+      assert(e42.getMessage.contains(s"does not have [select] privilege" +
+        s" on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
 
-    // OverwriteByExpression: Insert Overwrite
-    val e44 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
-          s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
-    assert(e44.getMessage.contains(s"does not have [update] privilege" +
-      s" on [$namespace1/$outputTable1]"))
+      // AppendData: Insert Using a SELECT Statement
+      val e43 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(s"INSERT INTO $catalogV2.$namespace1.$outputTable1 (id, name, 
city)" +
+            s" SELECT * from $catalogV2.$namespace1.$table1")))
+      assert(e43.getMessage.contains(s"does not have [select] privilege" +
+        s" on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+
+      // OverwriteByExpression: Insert Overwrite
+      val e44 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(s"INSERT OVERWRITE $catalogV2.$namespace1.$outputTable1 (id, 
name, city)" +
+            s" VALUES (1, 'bowenliang123', 'Guangzhou')")))
+      assert(e44.getMessage.contains(s"does not have [update] privilege" +
+        s" on [$namespace1/$outputTable1]"))
+    }
   }
 
   test("[KYUUBI #3424] MERGE INTO") {
-    val mergeIntoSql =
-      s"""
-         |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
-         |USING $catalogV2.$namespace1.$table1  AS source
-         |ON target.id = source.id
-         |WHEN MATCHED AND (target.name='delete') THEN DELETE
-         |WHEN MATCHED AND (target.name='update') THEN UPDATE SET target.city 
= source.city
+    withSingleCallEnabled {
+      val mergeIntoSql =
+        s"""
+           |MERGE INTO $catalogV2.$namespace1.$outputTable1 AS target
+           |USING $catalogV2.$namespace1.$table1  AS source
+           |ON target.id = source.id
+           |WHEN MATCHED AND (target.name='delete') THEN DELETE
+           |WHEN MATCHED AND (target.name='update') THEN UPDATE SET 
target.city = source.city
       """.stripMargin
 
-    // MergeIntoTable:  Using a MERGE INTO Statement
-    val e1 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(mergeIntoSql)))
-    assert(e1.getMessage.contains(s"does not have [select] privilege" +
-      s" on [$namespace1/$table1/id]"))
-
-    withSingleCallEnabled {
-      val e2 = intercept[AccessControlException](
+      // MergeIntoTable:  Using a MERGE INTO Statement
+      val e1 = intercept[AccessControlException](
         doAs(
           someone,
           sql(mergeIntoSql)))
-      assert(e2.getMessage.contains(s"does not have" +
-        s" [select] privilege" +
-        s" on 
[$namespace1/$table1/id,$namespace1/table1/name,$namespace1/$table1/city]," +
-        s" [update] privilege on [$namespace1/$outputTable1]"))
+      assert(e1.getMessage.contains(s"does not have [select] privilege" +
+        s" on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
+
+      withSingleCallEnabled {
+        val e2 = intercept[AccessControlException](
+          doAs(
+            someone,
+            sql(mergeIntoSql)))
+        assert(e2.getMessage.contains(s"does not have" +
+          s" [select] privilege" +
+          s" on 
[$namespace1/$table1/city,$namespace1/table1/id,$namespace1/$table1/name]," +
+          s" [update] privilege on [$namespace1/$outputTable1]"))
+      }
     }
   }
 
@@ -220,17 +228,14 @@ class V2JdbcTableCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSu
 
   test("[KYUUBI #3424] CACHE TABLE") {
     // CacheTable
-    val e7 = intercept[AccessControlException](
-      doAs(
-        someone,
-        sql(s"CACHE TABLE $cacheTable1" +
-          s" AS select * from $catalogV2.$namespace1.$table1")))
-    if (isSparkV32OrGreater) {
-      assert(e7.getMessage.contains(s"does not have [select] privilege" +
-        s" on [$namespace1/$table1/id]"))
-    } else {
+    withSingleCallEnabled {
+      val e7 = intercept[AccessControlException](
+        doAs(
+          someone,
+          sql(s"CACHE TABLE $cacheTable1" +
+            s" AS select * from $catalogV2.$namespace1.$table1")))
       assert(e7.getMessage.contains(s"does not have [select] privilege" +
-        s" on [$catalogV2.$namespace1/$table1]"))
+        s" on 
[$namespace1/$table1/city,$namespace1/$table1/id,$namespace1/$table1/name]"))
     }
   }
 

Reply via email to