This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 724ae9398 [KYUUBI #5248] [AUTHZ] Check previleges for Iceberg's CALL 
to RewriteDataFilesProcedure procedure
724ae9398 is described below

commit 724ae93989e7e64f858b5c621ef28e9b17e45f99
Author: yabola <[email protected]>
AuthorDate: Wed Sep 13 14:35:00 2023 +0800

    [KYUUBI #5248] [AUTHZ] Check previleges for Iceberg's CALL to 
RewriteDataFilesProcedure procedure
    
    ### _Why are the changes needed?_
    
    Before this PR, Kyuubi skips authentication for the call command in 
iceberg. After this PR, it needs `alter` permission to use this command.
    This PR supports checking iceberg call rewrite_data_files command:
    - CALL ice.system.rewrite_data_files(table => 'ice.lcl.test_ice', options 
=> map('min-input-files','2'))
    - some other call statements in iceberg ( 
https://iceberg.apache.org/docs/latest/spark-procedures ) may be supported but 
needs to be further determined. The next phase is planned to support all 
commands as documented.
    
    When the `rewrite_data_files` command is called, there are two situations:
    1. When the triggering conditions are not be met(such as input-files < 
min-input-files) , there is only one logical plan.  We  need to check one 
logical plan permission:
    ```
            * == Physical Plan 1 ==
           * Call (1)
    ```
    2. When the triggering  conditions are met, two logical plan will be 
triggered. We should check two logical plans permission.
    ```
           * == Physical Plan 1 ==
           * Call (1)
           *
           * == Physical Plan 2 ==
           * AppendData (3)
           * +- * ColumnarToRow (2)
           * +- BatchScan local.iceberg_ns.call_command_table (1)
    ```
    
    The second plan (`AppendData`) is a bit special,  causing us to have to 
modify the logic of extracting table names in 
`DataSourceV2RelationTableExtractor`. The `DataSourceV2Relation#identifier` in 
`AppendData` will be rewrite task id, so it is a more appropriate way to parse 
the table name from `DataSourceV2Relation#table`
    <img width="1318" alt="image" 
src="https://github.com/apache/kyuubi/assets/31469905/efc815cb-8dfe-4a9f-8e24-d62883ef6eff";>
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
 locally before make a pull request
    
    ### _Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #5248 from yabola/master.
    
    Closes #5248
    
    f3a2d736e [liangbowen] initDataFilesCount
    6c0da35f7 [Bowen Liang] skip input checks
    4a5ab6da4 [Bowen Liang] comment
    850100b6b [Bowen Liang] update
    c3158d25a [Bowen Liang] change operation type to ALTERTABLE_PROPERTIES
    8d5c8b1f8 [Bowen Liang] rewriteDataFiles2
    2d79e1730 [Bowen Liang] nit
    36f1d9d60 [chenliang.lu] update description for ut
    bd99e0dfa [chenliang.lu] update description for ut
    a95e1bb4f [Bowen Liang] nit
    7a0089907 [Bowen Liang] nit
    06566815b [Bowen Liang] revert opType
    1dd211b59 [Bowen Liang] withSingleCallEnabled
    657c9dcd3 [Bowen Liang] Merge branch 'master' into yabola/master
    cc7dda1f7 [Bowen Liang] update policy
    aecfd50cf [Bowen Liang] update table_spec
    66ddd89b1 [Bowen Liang] update
    b1e12789f [Bowen Liang] remove defaultBob
    b8f4fabaf [Bowen Liang] change operation type
    465fa48a2 [Bowen Liang] change ut to iceberg namespace
    0cfe285ea [Bowen Liang] comment
    2b835c943 [Bowen Liang] remove implicit casting in StringTableExtractor
    280ad029b [Bowen Liang] removed snapshotCommand tests
    83be1d0fd [Bowen Liang] removed snapshotCommand tests
    49bd8e14c [Bowen Liang] assertion on all required checks
    d65915eb9 [Bowen Liang] separate cases
    7513e055f [chenliang.lu] update ut
    2ac0e1141 [chenliang.lu] add more ut
    31a85a9a2 [Bowen Liang] rename ut
    a9773de2f [Bowen Liang] introduce TableTableExtractor
    da23d838f [Bowen Liang] input table
    45129bc3f [Bowen Liang] update
    adbe7535b [Bowen Liang] interceptContains
    f24e07d9a [Bowen Liang] update table_command_spec.json
    8a71a3c17 [Bowen Liang] rename to CallProcedure
    5c0fd5689 [Bowen Liang] rename uts
    c0c03176e [Bowen Liang] remove blanks
    da55dc597 [Bowen Liang] fix imports
    450f0779b [Bowen Liang] rename IcebergCallArgsTableExtractor to 
ExpressionSeqTableExtractor
    0deeffd57 [chenliang.lu] Support iceberg call statement in ranger 
authentication
    414ff2a42 [ChenliangLu] Merge branch 'apache:master' into master
    c85059a0b [chenliang.lu] fix comments
    85ffd0f88 [chenliang.lu] Support iceberg call statement in ranger 
authentication
    66dbfcc35 [chenliang.lu] optimize codes for table extractor
    f1e93fcd7 [chenliang.lu] Support iceberg call statement in ranger 
authentication
    
    Lead-authored-by: yabola <[email protected]>
    Co-authored-by: Bowen Liang <[email protected]>
    Co-authored-by: chenliang.lu <[email protected]>
    Co-authored-by: liangbowen <[email protected]>
    Co-authored-by: ChenliangLu <[email protected]>
    Signed-off-by: Bowen Liang <[email protected]>
---
 ....kyuubi.plugin.spark.authz.serde.TableExtractor |  3 ++
 .../src/main/resources/table_command_spec.json     | 14 +++++
 .../plugin/spark/authz/serde/tableExtractors.scala | 50 +++++++++++++++---
 .../spark/authz/gen/PolicyJsonFileGenerator.scala  |  2 +-
 .../IcebergCatalogPrivilegesBuilderSuite.scala     | 22 ++++++++
 .../plugin/spark/authz/gen/IcebergCommands.scala   |  8 +++
 .../IcebergCatalogRangerSparkExtensionSuite.scala  | 60 ++++++++++++++++++++++
 7 files changed, 152 insertions(+), 7 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index a312682b1..78f836c65 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -18,9 +18,12 @@
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
 org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 0025fae75..e11d94400 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -91,6 +91,20 @@
     "fieldName" : "plan",
     "fieldExtractor" : "LogicalPlanQueryExtractor"
   } ]
+}, {
+  "classname" : "org.apache.spark.sql.catalyst.plans.logical.Call",
+  "tableDescs" : [ {
+    "fieldName" : "args",
+    "fieldExtractor" : "ExpressionSeqTableExtractor",
+    "columnDesc" : null,
+    "actionTypeDesc" : null,
+    "tableTypeDesc" : null,
+    "catalogDesc" : null,
+    "isInput" : false,
+    "setCurrentDatabaseIfMissing" : false
+  } ],
+  "opType" : "ALTERTABLE_PROPERTIES",
+  "queryDescs" : [ ]
 }, {
   "classname" : "org.apache.spark.sql.catalyst.plans.logical.CommentOnTable",
   "tableDescs" : [ {
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 9a4435d5a..94641d6d0 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -132,6 +133,34 @@ class IdentifierTableExtractor extends TableExtractor {
   }
 }
 
+/**
+ * java.lang.String
+ * with concat parts by "."
+ */
+class StringTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    val tableNameArr = v1.asInstanceOf[String].split("\\.")
+    val maybeTable = tableNameArr.length match {
+      case 1 => Table(None, None, tableNameArr(0), None)
+      case 2 => Table(None, Some(tableNameArr(0)), tableNameArr(1), None)
+      case 3 => Table(Some(tableNameArr(0)), Some(tableNameArr(1)), 
tableNameArr(2), None)
+    }
+    Option(maybeTable)
+  }
+}
+
+/**
+ * Seq[org.apache.spark.sql.catalyst.expressions.Expression]
+ */
+class ExpressionSeqTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    val expressions = v1.asInstanceOf[Seq[Expression]]
+    // Iceberg will rearrange the parameters according to the parameter order
+    // defined in the procedure, where the table parameters are currently 
always the first.
+    lookupExtractor[StringTableExtractor].apply(spark, 
expressions.head.toString())
+  }
+}
+
 /**
  * org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
  */
@@ -145,12 +174,11 @@ class DataSourceV2RelationTableExtractor extends 
TableExtractor {
         val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation, 
"catalog")
         val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin =>
           lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin))
-        val maybeIdentifier = invokeAs[Option[AnyRef]](v2Relation, 
"identifier")
-        maybeIdentifier.flatMap { id =>
-          val maybeTable = 
lookupExtractor[IdentifierTableExtractor].apply(spark, id)
-          val maybeOwner = TableExtractor.getOwner(v2Relation)
-          maybeTable.map(_.copy(catalog = maybeCatalog, owner = maybeOwner))
-        }
+        lookupExtractor[TableTableExtractor].apply(spark, 
invokeAs[AnyRef](v2Relation, "table"))
+          .map { table =>
+            val maybeOwner = TableExtractor.getOwner(v2Relation)
+            table.copy(catalog = maybeCatalog, owner = maybeOwner)
+          }
     }
   }
 }
@@ -198,3 +226,13 @@ class ResolvedIdentifierTableExtractor extends 
TableExtractor {
     }
   }
 }
+
+/**
+ * org.apache.spark.sql.connector.catalog.Table
+ */
+class TableTableExtractor extends TableExtractor {
+  override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+    val tableName = invokeAs[String](v1, "name")
+    lookupExtractor[StringTableExtractor].apply(spark, tableName)
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
index 981635a17..2686cba20 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
@@ -28,9 +28,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.apache.commons.io.FileUtils
 import org.apache.ranger.plugin.model.RangerPolicy
-// scalastyle:off
 import org.scalatest.funsuite.AnyFunSuite
 
+// scalastyle:off
 import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
 import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
 import 
org.apache.kyuubi.plugin.spark.authz.gen.KRangerPolicyItemAccess.allowTypes
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
index db81cd6e8..45186e250 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
@@ -125,4 +125,26 @@ class IcebergCatalogPrivilegesBuilderSuite extends 
V2CommandsPrivilegesSuite {
       assert(accessType === AccessType.UPDATE)
     }
   }
+
+  test("RewriteDataFilesProcedure") {
+    val table = "RewriteDataFilesProcedure"
+    withV2Table(table) { tableId =>
+      sql(s"CREATE TABLE IF NOT EXISTS $tableId (key int, value String) USING 
iceberg")
+      sql(s"INSERT INTO $tableId VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      val plan = sql(s"CALL $catalogV2.system.rewrite_data_files (table => 
'$tableId')")
+        .queryExecution.analyzed
+      val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, 
spark)
+      assert(operationType === ALTERTABLE_PROPERTIES)
+      assert(inputs.size === 0)
+      assert(outputs.size === 1)
+      val po = outputs.head
+      assert(po.actionType === PrivilegeObjectActionType.OTHER)
+      assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+      assertEqualsIgnoreCase(namespace)(po.dbname)
+      assertEqualsIgnoreCase(table)(po.objectName)
+      val accessType = AccessType(po, operationType, isInput = false)
+      assert(accessType === AccessType.ALTER)
+    }
+  }
 }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
index 208e73c51..355143c40 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.plugin.spark.authz.gen
 
+import org.apache.kyuubi.plugin.spark.authz.OperationType
 import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
 import org.apache.kyuubi.plugin.spark.authz.serde._
 
@@ -49,7 +50,14 @@ object IcebergCommands {
     TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
   }
 
+  val CallProcedure = {
+    val cmd = "org.apache.spark.sql.catalyst.plans.logical.Call"
+    val td = TableDesc("args", classOf[ExpressionSeqTableExtractor])
+    TableCommandSpec(cmd, Seq(td), opType = 
OperationType.ALTERTABLE_PROPERTIES)
+  }
+
   val data: Array[TableCommandSpec] = Array(
+    CallProcedure,
     DeleteFromIcebergTable,
     UpdateIcebergTable,
     MergeIntoIcebergTable,
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index b22a812fd..55fde3b68 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -27,6 +27,7 @@ import 
org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
 import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
 import org.apache.kyuubi.tags.IcebergTest
+import org.apache.kyuubi.util.AssertionUtils._
 
 /**
  * Tests for RangerSparkExtensionSuite
@@ -226,4 +227,63 @@ class IcebergCatalogRangerSparkExtensionSuite extends 
RangerSparkExtensionSuite
     assert(e1.getMessage.contains(s"does not have [select] privilege" +
       s" on [$namespace1/$table1]"))
   }
+
+  test("CALL RewriteDataFilesProcedure") {
+    val tableName = "table_select_call_command_table"
+    val table = s"$catalogV2.$namespace1.$tableName"
+    val initDataFilesCount = 2
+    val rewriteDataFiles1 = s"CALL $catalogV2.system.rewrite_data_files " +
+      s"(table => '$table', options => 
map('min-input-files','$initDataFilesCount'))"
+    val rewriteDataFiles2 = s"CALL $catalogV2.system.rewrite_data_files " +
+      s"(table => '$table', options => 
map('min-input-files','${initDataFilesCount + 1}'))"
+
+    withCleanTmpResources(Seq((table, "table"))) {
+      doAs(
+        admin, {
+          sql(s"CREATE TABLE IF NOT EXISTS $table  (id int, name string) USING 
iceberg")
+          // insert 2 data files
+          (0 until initDataFilesCount)
+            .foreach(i => sql(s"INSERT INTO $table VALUES ($i, 'user_$i')"))
+        })
+
+      interceptContains[AccessControlException](doAs(someone, 
sql(rewriteDataFiles1)))(
+        s"does not have [alter] privilege on [$namespace1/$tableName]")
+      interceptContains[AccessControlException](doAs(someone, 
sql(rewriteDataFiles2)))(
+        s"does not have [alter] privilege on [$namespace1/$tableName]")
+
+      /**
+       * Case 1: Number of input data files equals or greater than minimum 
expected.
+       * Two logical plans triggered
+       * when ( input-files(2) >= min-input-files(2) ):
+       *
+       * == Physical Plan 1 ==
+       * Call (1)
+       *
+       * == Physical Plan 2 ==
+       * AppendData (3)
+       * +- * ColumnarToRow (2)
+       * +- BatchScan local.iceberg_ns.call_command_table (1)
+       */
+      doAs(
+        admin, {
+          val result1 = sql(rewriteDataFiles1).collect()
+          // rewritten results into 2 data files
+          assert(result1(0)(0) === initDataFilesCount)
+        })
+
+      /**
+       * Case 2: Number of input data files less than minimum expected.
+       * Only one logical plan triggered
+       * when ( input-files(2) < min-input-files(3) )
+       *
+       * == Physical Plan ==
+       *  Call (1)
+       */
+      doAs(
+        admin, {
+          val result2 = sql(rewriteDataFiles2).collect()
+          assert(result2(0)(0) === 0)
+        })
+    }
+  }
 }

Reply via email to