This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 23f32cf15 [KYUUBI #5690][AUTHZ] Support insert into/overwrite
path-based table for Delta Lake in Authz
23f32cf15 is described below
commit 23f32cf1512fe28839e2af450a5f733fa257cbeb
Author: zml1206 <[email protected]>
AuthorDate: Thu Nov 16 10:12:43 2023 +0800
[KYUUBI #5690][AUTHZ] Support insert into/overwrite path-based table for
Delta Lake in Authz
### _Why are the changes needed?_
To close #5690 .
Support insert into/overwrite path-based table for Delta Lake in Authz
plugin.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5691 from zml1206/KYUUBI-5690.
Closes #5690
e1506ca93 [zml1206] update
704ce2c6e [zml1206] fix
bf3471109 [zml1206] Support insert into/overwrite path-based table for
Delta Lake in Authz
Authored-by: zml1206 <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
...he.kyuubi.plugin.spark.authz.serde.URIExtractor | 1 +
.../src/main/resources/table_command_spec.json | 18 ++++++++++---
.../plugin/spark/authz/serde/tableExtractors.scala | 15 ++++++-----
.../plugin/spark/authz/serde/uriExtractors.scala | 16 +++++++++++-
.../plugin/spark/authz/gen/TableCommands.scala | 6 +++--
.../DeltaCatalogRangerSparkExtensionSuite.scala | 30 ++++++++++++++++++++++
6 files changed, 73 insertions(+), 13 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
index fb869f602..60f761233 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
@@ -18,6 +18,7 @@
org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.DataSourceV2RelationURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 57fa3b267..4442e6868 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -79,7 +79,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
- "uriDescs" : [ ]
+ "uriDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "DataSourceV2RelationURIExtractor",
+ "isInput" : false
+ } ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.CacheTable",
"tableDescs" : [ ],
@@ -365,7 +369,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
- "uriDescs" : [ ]
+ "uriDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "DataSourceV2RelationURIExtractor",
+ "isInput" : false
+ } ]
}, {
"classname" :
"org.apache.spark.sql.catalyst.plans.logical.OverwritePartitionsDynamic",
"tableDescs" : [ {
@@ -387,7 +395,11 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ],
- "uriDescs" : [ ]
+ "uriDescs" : [ {
+ "fieldName" : "table",
+ "fieldExtractor" : "DataSourceV2RelationURIExtractor",
+ "isInput" : false
+ } ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.RefreshTable",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index ce595c66b..82448f9cd 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.Identifier
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String
@@ -186,18 +187,18 @@ class ExpressionSeqTableExtractor extends TableExtractor {
class DataSourceV2RelationTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
val plan = v1.asInstanceOf[LogicalPlan]
- val maybeV2Relation = plan.find(_.getClass.getSimpleName ==
"DataSourceV2Relation")
- maybeV2Relation match {
- case None => None
- case Some(v2Relation) =>
- val maybeCatalogPlugin = invokeAs[Option[AnyRef]](v2Relation,
"catalog")
- val maybeCatalog = maybeCatalogPlugin.flatMap(catalogPlugin =>
+ plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
+ case v2Relation: DataSourceV2Relation
+ if v2Relation.identifier == None ||
+ !isPathIdentifier(v2Relation.identifier.get.name(), spark) =>
+ val maybeCatalog = v2Relation.catalog.flatMap(catalogPlugin =>
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogPlugin))
- lookupExtractor[TableTableExtractor].apply(spark,
invokeAs[AnyRef](v2Relation, "table"))
+ lookupExtractor[TableTableExtractor].apply(spark, v2Relation.table)
.map { table =>
val maybeOwner = TableExtractor.getOwner(v2Relation)
table.copy(catalog = maybeCatalog, owner = maybeOwner)
}
+ case _ => None
}
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
index 7d4833516..eff842746 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
@@ -19,9 +19,10 @@ package org.apache.kyuubi.plugin.spark.authz.serde
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
-import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
@@ -115,3 +116,16 @@ class SubqueryAliasURIExtractor extends URIExtractor {
Seq(identifier.name).map(Uri)
}
}
+
+class DataSourceV2RelationURIExtractor extends URIExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
+ val plan = v1.asInstanceOf[LogicalPlan]
+ plan.find(_.getClass.getSimpleName == "DataSourceV2Relation").get match {
+ case v2Relation: DataSourceV2Relation
+ if v2Relation.identifier != None &&
+ isPathIdentifier(v2Relation.identifier.get.name, spark) =>
+ Seq(v2Relation.identifier.get.name).map(Uri)
+ case _ => Nil
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index 4d7dc2ac5..ee0345a8c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -270,7 +270,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
"table",
classOf[DataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
- TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
+ val uriDescs = Seq(UriDesc("table",
classOf[DataSourceV2RelationURIExtractor]))
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc),
uriDescs = uriDescs)
}
val ReplaceData = {
@@ -308,7 +309,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
"table",
classOf[DataSourceV2RelationTableExtractor],
actionTypeDesc = Some(actionTypeDesc))
- TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc))
+ val uriDescs = Seq(UriDesc("table",
classOf[DataSourceV2RelationURIExtractor]))
+ TableCommandSpec(cmd, Seq(tableDesc), queryDescs = Seq(queryQueryDesc),
uriDescs = uriDescs)
}
val OverwritePartitionsDynamic = {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 514b7970b..331bd380d 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -357,6 +357,36 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
doAs(admin, sql(updateTableSql))
})
}
+
+ test("insert path-based table") {
+ withSingleCallEnabled {
+ withCleanTmpResources(Seq((s"$namespace1.$table2", "table"),
(s"$namespace1", "database"))) {
+ doAs(admin, sql(s"CREATE DATABASE IF NOT EXISTS $namespace1"))
+ doAs(admin, sql(createTableSql(namespace1, table2)))
+ withTempDir(path => {
+ doAs(admin, sql(createPathBasedTableSql(path)))
+ // insert into
+ val insertIntoSql = s"INSERT INTO delta.`$path` SELECT * FROM
$namespace1.$table2"
+ interceptContains[AccessControlException](
+ doAs(someone, sql(insertIntoSql)))(
+ s"does not have [select] privilege on [$namespace1/$table2/id," +
+ s"$namespace1/$table2/name,$namespace1/$table2/gender," +
+ s"$namespace1/$table2/birthDate], [write] privilege on [[$path,
$path/]]")
+ doAs(admin, sql(insertIntoSql))
+
+ // insert overwrite
+ val insertOverwriteSql =
+ s"INSERT OVERWRITE delta.`$path` SELECT * FROM $namespace1.$table2"
+ interceptContains[AccessControlException](
+ doAs(someone, sql(insertOverwriteSql)))(
+ s"does not have [select] privilege on [$namespace1/$table2/id," +
+ s"$namespace1/$table2/name,$namespace1/$table2/gender," +
+ s"$namespace1/$table2/birthDate], [write] privilege on [[$path,
$path/]]")
+ doAs(admin, sql(insertOverwriteSql))
+ })
+ }
+ }
+ }
}
object DeltaCatalogRangerSparkExtensionSuite {