This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 724ae9398 [KYUUBI #5248] [AUTHZ] Check previleges for Iceberg's CALL
to RewriteDataFilesProcedure procedure
724ae9398 is described below
commit 724ae93989e7e64f858b5c621ef28e9b17e45f99
Author: yabola <[email protected]>
AuthorDate: Wed Sep 13 14:35:00 2023 +0800
[KYUUBI #5248] [AUTHZ] Check previleges for Iceberg's CALL to
RewriteDataFilesProcedure procedure
### _Why are the changes needed?_
Before this PR, Kyuubi skips authentication for the call command in
iceberg. After this PR, it needs `alter` permission to use this command.
This PR supports checking iceberg call rewrite_data_files command:
- CALL ice.system.rewrite_data_files(table => 'ice.lcl.test_ice', options
=> map('min-input-files','2'))
- some other call statements in iceberg (
https://iceberg.apache.org/docs/latest/spark-procedures ) may be supported but
needs to be further determined. The next phase is planned to support all
commands as documented.
When the `rewrite_data_files` command is called, there are two situations:
1. When the triggering conditions are not be met(such as input-files <
min-input-files) , there is only one logical plan. We need to check one
logical plan permission:
```
* == Physical Plan 1 ==
* Call (1)
```
2. When the triggering conditions are met, two logical plan will be
triggered. We should check two logical plans permission.
```
* == Physical Plan 1 ==
* Call (1)
*
* == Physical Plan 2 ==
* AppendData (3)
* +- * ColumnarToRow (2)
* +- BatchScan local.iceberg_ns.call_command_table (1)
```
The second plan (`AppendData`) is a bit special, causing us to have to
modify the logic of extracting table names in
`DataSourceV2RelationTableExtractor`. The `DataSourceV2Relation#identifier` in
`AppendData` will be rewrite task id, so it is a more appropriate way to parse
the table name from `DataSourceV2Relation#table`
<img width="1318" alt="image"
src="https://github.com/apache/kyuubi/assets/31469905/efc815cb-8dfe-4a9f-8e24-d62883ef6eff">
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?
No.
Closes #5248 from yabola/master.
Closes #5248
f3a2d736e [liangbowen] initDataFilesCount
6c0da35f7 [Bowen Liang] skip input checks
4a5ab6da4 [Bowen Liang] comment
850100b6b [Bowen Liang] update
c3158d25a [Bowen Liang] change operation type to ALTERTABLE_PROPERTIES
8d5c8b1f8 [Bowen Liang] rewriteDataFiles2
2d79e1730 [Bowen Liang] nit
36f1d9d60 [chenliang.lu] update description for ut
bd99e0dfa [chenliang.lu] update description for ut
a95e1bb4f [Bowen Liang] nit
7a0089907 [Bowen Liang] nit
06566815b [Bowen Liang] revert opType
1dd211b59 [Bowen Liang] withSingleCallEnabled
657c9dcd3 [Bowen Liang] Merge branch 'master' into yabola/master
cc7dda1f7 [Bowen Liang] update policy
aecfd50cf [Bowen Liang] update table_spec
66ddd89b1 [Bowen Liang] update
b1e12789f [Bowen Liang] remove defaultBob
b8f4fabaf [Bowen Liang] change operation type
465fa48a2 [Bowen Liang] change ut to iceberg namespace
0cfe285ea [Bowen Liang] comment
2b835c943 [Bowen Liang] remove implicit casting in StringTableExtractor
280ad029b [Bowen Liang] removed snapshotCommand tests
83be1d0fd [Bowen Liang] removed snapshotCommand tests
49bd8e14c [Bowen Liang] assertion on all required checks
d65915eb9 [Bowen Liang] separate cases
7513e055f [chenliang.lu] update ut
2ac0e1141 [chenliang.lu] add more ut
31a85a9a2 [Bowen Liang] rename ut
a9773de2f [Bowen Liang] introduce TableTableExtractor
da23d838f [Bowen Liang] input table
45129bc3f [Bowen Liang] update
adbe7535b [Bowen Liang] interceptContains
f24e07d9a [Bowen Liang] update table_command_spec.json
8a71a3c17 [Bowen Liang] rename to CallProcedure
5c0fd5689 [Bowen Liang] rename uts
c0c03176e [Bowen Liang] remove blanks
da55dc597 [Bowen Liang] fix imports
450f0779b [Bowen Liang] rename IcebergCallArgsTableExtractor to
ExpressionSeqTableExtractor
0deeffd57 [chenliang.lu] Support iceberg call statement in ranger
authentication
414ff2a42 [ChenliangLu] Merge branch 'apache:master' into master
c85059a0b [chenliang.lu] fix comments
85ffd0f88 [chenliang.lu] Support iceberg call statement in ranger
authentication
66dbfcc35 [chenliang.lu] optimize codes for table extractor
f1e93fcd7 [chenliang.lu] Support iceberg call statement in ranger
authentication
Lead-authored-by: yabola <[email protected]>
Co-authored-by: Bowen Liang <[email protected]>
Co-authored-by: chenliang.lu <[email protected]>
Co-authored-by: liangbowen <[email protected]>
Co-authored-by: ChenliangLu <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
....kyuubi.plugin.spark.authz.serde.TableExtractor | 3 ++
.../src/main/resources/table_command_spec.json | 14 +++++
.../plugin/spark/authz/serde/tableExtractors.scala | 50 +++++++++++++++---
.../spark/authz/gen/PolicyJsonFileGenerator.scala | 2 +-
.../IcebergCatalogPrivilegesBuilderSuite.scala | 22 ++++++++
.../plugin/spark/authz/gen/IcebergCommands.scala | 8 +++
.../IcebergCatalogRangerSparkExtensionSuite.scala | 60 ++++++++++++++++++++++
7 files changed, 152 insertions(+), 7 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
index a312682b1..78f836c65 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.TableExtractor
@@ -18,9 +18,12 @@
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableOptionTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.ExpressionSeqTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.LogicalRelationTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedDbObjectNameTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedIdentifierTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.ResolvedTableTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.StringTableExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TableIdentifierTableExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.TableTableExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 0025fae75..e11d94400 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -91,6 +91,20 @@
"fieldName" : "plan",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
+}, {
+ "classname" : "org.apache.spark.sql.catalyst.plans.logical.Call",
+ "tableDescs" : [ {
+ "fieldName" : "args",
+ "fieldExtractor" : "ExpressionSeqTableExtractor",
+ "columnDesc" : null,
+ "actionTypeDesc" : null,
+ "tableTypeDesc" : null,
+ "catalogDesc" : null,
+ "isInput" : false,
+ "setCurrentDatabaseIfMissing" : false
+ } ],
+ "opType" : "ALTERTABLE_PROPERTIES",
+ "queryDescs" : [ ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CommentOnTable",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index 9a4435d5a..94641d6d0 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -132,6 +133,34 @@ class IdentifierTableExtractor extends TableExtractor {
}
}
+/**
+ * java.lang.String
+ * with concat parts by "."
+ */
+class StringTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ val tableNameArr = v1.asInstanceOf[String].split("\\.")
+ val maybeTable = tableNameArr.length match {
+ case 1 => Table(None, None, tableNameArr(0), None)
+ case 2 => Table(None, Some(tableNameArr(0)), tableNameArr(1), None)
+ case 3 => Table(Some(tableNameArr(0)), Some(tableNameArr(1)),
tableNameArr(2), None)
+ }
+ Option(maybeTable)
+ }
+}
+
+/**
+ * Seq[org.apache.spark.sql.catalyst.expressions.Expression]
+ */
+class ExpressionSeqTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ val expressions = v1.asInstanceOf[Seq[Expression]]
+ // Iceberg will rearrange the parameters according to the parameter order
+ // defined in the procedure, where the table parameters are currently
always the first.
+ lookupExtractor[StringTableExtractor].apply(spark,
expressions.head.toString())
+ }
+}
+
/**
* org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
*/
@@ -145,12 +174,11 @@ class DataSourceV2RelationTableExtractor extends
TableExtractor {
val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation,
"catalog")
val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin =>
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin))
- val maybeIdentifier = invokeAs[Option[AnyRef]](v2Relation,
"identifier")
- maybeIdentifier.flatMap { id =>
- val maybeTable =
lookupExtractor[IdentifierTableExtractor].apply(spark, id)
- val maybeOwner = TableExtractor.getOwner(v2Relation)
- maybeTable.map(_.copy(catalog = maybeCatalog, owner = maybeOwner))
- }
+ lookupExtractor[TableTableExtractor].apply(spark,
invokeAs[AnyRef](v2Relation, "table"))
+ .map { table =>
+ val maybeOwner = TableExtractor.getOwner(v2Relation)
+ table.copy(catalog = maybeCatalog, owner = maybeOwner)
+ }
}
}
}
@@ -198,3 +226,13 @@ class ResolvedIdentifierTableExtractor extends
TableExtractor {
}
}
}
+
+/**
+ * org.apache.spark.sql.connector.catalog.Table
+ */
+class TableTableExtractor extends TableExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
+ val tableName = invokeAs[String](v1, "name")
+ lookupExtractor[StringTableExtractor].apply(spark, tableName)
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
index 981635a17..2686cba20 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala
@@ -28,9 +28,9 @@ import com.fasterxml.jackson.databind.node.ObjectNode
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.commons.io.FileUtils
import org.apache.ranger.plugin.model.RangerPolicy
-// scalastyle:off
import org.scalatest.funsuite.AnyFunSuite
+// scalastyle:off
import org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import
org.apache.kyuubi.plugin.spark.authz.gen.KRangerPolicyItemAccess.allowTypes
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
index db81cd6e8..45186e250 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/IcebergCatalogPrivilegesBuilderSuite.scala
@@ -125,4 +125,26 @@ class IcebergCatalogPrivilegesBuilderSuite extends
V2CommandsPrivilegesSuite {
assert(accessType === AccessType.UPDATE)
}
}
+
+ test("RewriteDataFilesProcedure") {
+ val table = "RewriteDataFilesProcedure"
+ withV2Table(table) { tableId =>
+ sql(s"CREATE TABLE IF NOT EXISTS $tableId (key int, value String) USING
iceberg")
+ sql(s"INSERT INTO $tableId VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ val plan = sql(s"CALL $catalogV2.system.rewrite_data_files (table =>
'$tableId')")
+ .queryExecution.analyzed
+ val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan,
spark)
+ assert(operationType === ALTERTABLE_PROPERTIES)
+ assert(inputs.size === 0)
+ assert(outputs.size === 1)
+ val po = outputs.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assertEqualsIgnoreCase(namespace)(po.dbname)
+ assertEqualsIgnoreCase(table)(po.objectName)
+ val accessType = AccessType(po, operationType, isInput = false)
+ assert(accessType === AccessType.ALTER)
+ }
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
index 208e73c51..355143c40 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/IcebergCommands.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.plugin.spark.authz.gen
+import org.apache.kyuubi.plugin.spark.authz.OperationType
import org.apache.kyuubi.plugin.spark.authz.PrivilegeObjectActionType._
import org.apache.kyuubi.plugin.spark.authz.serde._
@@ -49,7 +50,14 @@ object IcebergCommands {
TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryDesc))
}
+ val CallProcedure = {
+ val cmd = "org.apache.spark.sql.catalyst.plans.logical.Call"
+ val td = TableDesc("args", classOf[ExpressionSeqTableExtractor])
+ TableCommandSpec(cmd, Seq(td), opType =
OperationType.ALTERTABLE_PROPERTIES)
+ }
+
val data: Array[TableCommandSpec] = Array(
+ CallProcedure,
DeleteFromIcebergTable,
UpdateIcebergTable,
MergeIntoIcebergTable,
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
index b22a812fd..55fde3b68 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/IcebergCatalogRangerSparkExtensionSuite.scala
@@ -27,6 +27,7 @@ import
org.apache.kyuubi.plugin.spark.authz.RangerTestNamespace._
import org.apache.kyuubi.plugin.spark.authz.RangerTestUsers._
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
import org.apache.kyuubi.tags.IcebergTest
+import org.apache.kyuubi.util.AssertionUtils._
/**
* Tests for RangerSparkExtensionSuite
@@ -226,4 +227,63 @@ class IcebergCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite
assert(e1.getMessage.contains(s"does not have [select] privilege" +
s" on [$namespace1/$table1]"))
}
+
+ test("CALL RewriteDataFilesProcedure") {
+ val tableName = "table_select_call_command_table"
+ val table = s"$catalogV2.$namespace1.$tableName"
+ val initDataFilesCount = 2
+ val rewriteDataFiles1 = s"CALL $catalogV2.system.rewrite_data_files " +
+ s"(table => '$table', options =>
map('min-input-files','$initDataFilesCount'))"
+ val rewriteDataFiles2 = s"CALL $catalogV2.system.rewrite_data_files " +
+ s"(table => '$table', options =>
map('min-input-files','${initDataFilesCount + 1}'))"
+
+ withCleanTmpResources(Seq((table, "table"))) {
+ doAs(
+ admin, {
+ sql(s"CREATE TABLE IF NOT EXISTS $table (id int, name string) USING
iceberg")
+ // insert 2 data files
+ (0 until initDataFilesCount)
+ .foreach(i => sql(s"INSERT INTO $table VALUES ($i, 'user_$i')"))
+ })
+
+ interceptContains[AccessControlException](doAs(someone,
sql(rewriteDataFiles1)))(
+ s"does not have [alter] privilege on [$namespace1/$tableName]")
+ interceptContains[AccessControlException](doAs(someone,
sql(rewriteDataFiles2)))(
+ s"does not have [alter] privilege on [$namespace1/$tableName]")
+
+ /**
+ * Case 1: Number of input data files equals or greater than minimum
expected.
+ * Two logical plans triggered
+ * when ( input-files(2) >= min-input-files(2) ):
+ *
+ * == Physical Plan 1 ==
+ * Call (1)
+ *
+ * == Physical Plan 2 ==
+ * AppendData (3)
+ * +- * ColumnarToRow (2)
+ * +- BatchScan local.iceberg_ns.call_command_table (1)
+ */
+ doAs(
+ admin, {
+ val result1 = sql(rewriteDataFiles1).collect()
+ // rewritten results into 2 data files
+ assert(result1(0)(0) === initDataFilesCount)
+ })
+
+ /**
+ * Case 2: Number of input data files less than minimum expected.
+ * Only one logical plan triggered
+ * when ( input-files(2) < min-input-files(3) )
+ *
+ * == Physical Plan ==
+ * Call (1)
+ */
+ doAs(
+ admin, {
+ val result2 = sql(rewriteDataFiles2).collect()
+ assert(result2(0)(0) === 0)
+ })
+ }
+ }
}