This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 6b8c65d31 [KYUUBI #5664][AUTHZ] Support create path-based table for
Delta Lake
6b8c65d31 is described below
commit 6b8c65d31029c7e8416a0e288edd9d6b393eb0cb
Author: zml1206 <[email protected]>
AuthorDate: Mon Nov 13 20:54:31 2023 +0800
[KYUUBI #5664][AUTHZ] Support create path-based table for Delta Lake
### _Why are the changes needed?_
To close #5664 .
Support create path-based table for Delta Lake in Authz plugin.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests)
locally before make a pull request
### _Was this patch authored or co-authored using generative AI tooling?_
No.
Closes #5666 from zml1206/KYUUBI-5664.
Closes #5664
6fee6bdf7 [zml1206] update
4c700148d [zml1206] pass spark into urlExtractors
0b186049e [zml1206] Remove hadoop dependency
52bcc02b9 [zml1206] update
8f47f574e [zml1206] Merge branch 'master' into KYUUBI-5664
8aa0d2e79 [zml1206] update
7ad43f522 [zml1206] Support create path-based table for Delta Lake in Authz
61ee82374 [zml1206] Support create path-based table for Delta Lake in Authz
Authored-by: zml1206 <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
...he.kyuubi.plugin.spark.authz.serde.URIExtractor | 1 +
.../src/main/resources/table_command_spec.json | 12 +++++++
.../plugin/spark/authz/PrivilegesBuilder.scala | 4 +--
.../plugin/spark/authz/serde/Descriptor.scala | 6 +++-
.../plugin/spark/authz/serde/tableExtractors.scala | 22 +++++++-----
.../plugin/spark/authz/serde/uriExtractors.scala | 33 +++++++++++------
.../plugin/spark/authz/util/PathIdentifier.scala | 31 ++++++++++++++++
.../plugin/spark/authz/gen/TableCommands.scala | 7 ++--
.../DeltaCatalogRangerSparkExtensionSuite.scala | 42 ++++++++++++++++++++++
9 files changed, 134 insertions(+), 24 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
index d7c859c7a..631cb7a23 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.URIExtractor
@@ -18,6 +18,7 @@
org.apache.kyuubi.plugin.spark.authz.serde.BaseRelationFileIndexURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogStorageFormatURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.CatalogTableURIExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.IdentifierURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PartitionLocsSeqURIExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesLocationUriExtractor
org.apache.kyuubi.plugin.spark.authz.serde.PropertiesPathUriExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 7b5ebad6c..f582d81de 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -156,6 +156,10 @@
"fieldName" : "properties",
"fieldExtractor" : "PropertiesLocationUriExtractor",
"isInput" : false
+ }, {
+ "fieldName" : "tableName",
+ "fieldExtractor" : "IdentifierURIExtractor",
+ "isInput" : false
} ]
}, {
"classname" :
"org.apache.spark.sql.catalyst.plans.logical.CreateTableAsSelect",
@@ -225,6 +229,10 @@
"fieldName" : "properties",
"fieldExtractor" : "PropertiesLocationUriExtractor",
"isInput" : false
+ }, {
+ "fieldName" : "tableName",
+ "fieldExtractor" : "IdentifierURIExtractor",
+ "isInput" : false
} ]
}, {
"classname" : "org.apache.spark.sql.catalyst.plans.logical.DeleteFromTable",
@@ -520,6 +528,10 @@
"fieldName" : "properties",
"fieldExtractor" : "PropertiesLocationUriExtractor",
"isInput" : false
+ }, {
+ "fieldName" : "tableName",
+ "fieldExtractor" : "IdentifierURIExtractor",
+ "isInput" : false
} ]
}, {
"classname" :
"org.apache.spark.sql.catalyst.plans.logical.ReplaceTableAsSelect",
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index 212ed74aa..df1738006 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -191,7 +191,7 @@ object PrivilegesBuilder {
}
desc.uriDescs.foreach { ud =>
try {
- val uris = ud.extract(plan)
+ val uris = ud.extract(plan, spark)
if (ud.isInput) {
inputObjs ++= uris.map(PrivilegeObject(_))
} else {
@@ -215,7 +215,7 @@ object PrivilegesBuilder {
}
spec.uriDescs.foreach { ud =>
try {
- val uris = ud.extract(plan)
+ val uris = ud.extract(plan, spark)
if (ud.isInput) {
inputObjs ++= uris.map(PrivilegeObject(_))
} else {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
index 5b73e7ceb..2ffad1a2f 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/Descriptor.scala
@@ -319,8 +319,12 @@ case class UriDesc(
fieldExtractor: String,
isInput: Boolean = false) extends Descriptor {
override def extract(v: AnyRef): Seq[Uri] = {
+ extract(v, SparkSession.active)
+ }
+
+ def extract(v: AnyRef, spark: SparkSession): Seq[Uri] = {
val uriVal = invokeAs[AnyRef](v, fieldName)
val uriExtractor = lookupExtractor[URIExtractor](fieldExtractor)
- uriExtractor(uriVal)
+ uriExtractor(spark, uriVal)
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
index c90da5d36..3e09775db 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/tableExtractors.scala
@@ -27,10 +27,12 @@ import org.apache.spark.sql.catalyst.{InternalRow,
TableIdentifier}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.types.DataType
import org.apache.spark.unsafe.types.UTF8String
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
+import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._
import org.apache.kyuubi.util.reflect.ReflectUtils._
/**
@@ -143,10 +145,10 @@ class ResolvedTableTableExtractor extends TableExtractor {
* org.apache.spark.sql.connector.catalog.Identifier
*/
class IdentifierTableExtractor extends TableExtractor {
- override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
- val namespace = invokeAs[Array[String]](v1, "namespace")
- val table = invokeAs[String](v1, "name")
- Some(Table(None, Some(quote(namespace)), table, None))
+ override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = v1
match {
+ case identifier: Identifier if !isPathIdentifier(identifier.name(), spark)
=>
+ Some(Table(None, Some(quote(identifier.namespace())), identifier.name(),
None))
+ case _ => None
}
}
@@ -217,12 +219,16 @@ class LogicalRelationTableExtractor extends
TableExtractor {
*/
class ResolvedDbObjectNameTableExtractor extends TableExtractor {
override def apply(spark: SparkSession, v1: AnyRef): Option[Table] = {
- val catalogVal = invokeAs[AnyRef](v1, "catalog")
- val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
val nameParts = invokeAs[Seq[String]](v1, "nameParts")
- val namespace = nameParts.init.toArray
val table = nameParts.last
- Some(Table(catalog, Some(quote(namespace)), table, None))
+ if (isPathIdentifier(table, spark)) {
+ None
+ } else {
+ val catalogVal = invokeAs[AnyRef](v1, "catalog")
+ val catalog =
lookupExtractor[CatalogPluginCatalogExtractor].apply(catalogVal)
+ val namespace = nameParts.init.toArray
+ Some(Table(catalog, Some(quote(namespace)), table, None))
+ }
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
index 77c7367fe..98fbeffde 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/uriExtractors.scala
@@ -17,12 +17,15 @@
package org.apache.kyuubi.plugin.spark.authz.serde
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat,
CatalogTable}
+import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
+import org.apache.kyuubi.plugin.spark.authz.util.PathIdentifier._
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
-trait URIExtractor extends (AnyRef => Seq[Uri]) with Extractor
+trait URIExtractor extends ((SparkSession, AnyRef) => Seq[Uri]) with Extractor
object URIExtractor {
val uriExtractors: Map[String, URIExtractor] = {
@@ -34,7 +37,7 @@ object URIExtractor {
* String
*/
class StringURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1 match {
case uriPath: String => Seq(Uri(uriPath))
case Some(uriPath: String) => Seq(Uri(uriPath))
@@ -44,31 +47,31 @@ class StringURIExtractor extends URIExtractor {
}
class StringSeqURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[Seq[String]].map(Uri)
}
}
class CatalogStorageFormatURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[CatalogStorageFormat].locationUri.map(uri =>
Uri(uri.getPath)).toSeq
}
}
class PropertiesPathUriExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[Map[String, String]].get("path").map(Uri).toSeq
}
}
class PropertiesLocationUriExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[Map[String, String]].get("location").map(Uri).toSeq
}
}
class BaseRelationFileIndexURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1 match {
case h: HadoopFsRelation => h.location.rootPaths.map(_.toString).map(Uri)
case _ => Nil
@@ -77,19 +80,27 @@ class BaseRelationFileIndexURIExtractor extends
URIExtractor {
}
class TableSpecURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
- new StringURIExtractor().apply(invokeAs[Option[String]](v1, "location"))
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
+ new StringURIExtractor().apply(spark, invokeAs[Option[String]](v1,
"location"))
}
}
class CatalogTableURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[CatalogTable].storage.locationUri.map(_.toString).map(Uri).toSeq
}
}
class PartitionLocsSeqURIExtractor extends URIExtractor {
- override def apply(v1: AnyRef): Seq[Uri] = {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = {
v1.asInstanceOf[Seq[(_, Option[String])]].flatMap(_._2).map(Uri)
}
}
+
+class IdentifierURIExtractor extends URIExtractor {
+ override def apply(spark: SparkSession, v1: AnyRef): Seq[Uri] = v1 match {
+ case identifier: Identifier if isPathIdentifier(identifier.name(), spark)
=>
+ Seq(identifier.name()).map(Uri)
+ case _ => Nil
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/PathIdentifier.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/PathIdentifier.scala
new file mode 100644
index 000000000..2666c37c3
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/PathIdentifier.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.util
+
+import java.io.File
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * An object for handling table access on path-based table. This is a stop-gap
solution
+ * until PathIdentifiers are implemented in Apache Spark.
+ */
+object PathIdentifier {
+ def isPathIdentifier(path: String, spark: SparkSession): Boolean =
+ spark.sessionState.conf.runSQLonFile && path != null &&
path.startsWith(File.separator)
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index faf1b49ee..4d7dc2ac5 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -216,7 +216,8 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
catalogDesc = Some(CatalogDesc()))
val uriDescs = Seq(
UriDesc("tableSpec", classOf[TableSpecURIExtractor]),
- UriDesc("properties", classOf[PropertiesLocationUriExtractor]))
+ UriDesc("properties", classOf[PropertiesLocationUriExtractor]),
+ UriDesc("tableName", classOf[IdentifierURIExtractor]))
TableCommandSpec(
cmd,
Seq(resolvedIdentifierTableDesc, tableDesc, resolvedDbObjectNameDesc),
@@ -230,7 +231,9 @@ object TableCommands extends CommandSpecs[TableCommandSpec]
{
"tableName",
classOf[IdentifierTableExtractor],
catalogDesc = Some(CatalogDesc()))
- val uriDescs = Seq(UriDesc("properties",
classOf[PropertiesLocationUriExtractor]))
+ val uriDescs = Seq(
+ UriDesc("properties", classOf[PropertiesLocationUriExtractor]),
+ UriDesc("tableName", classOf[IdentifierURIExtractor]))
TableCommandSpec(cmd, Seq(tableDesc), CREATETABLE, uriDescs = uriDescs)
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
index 9c4dc42ff..d1f49763c 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/DeltaCatalogRangerSparkExtensionSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.kyuubi.plugin.spark.authz.ranger
+import java.nio.file.Path
+
import org.scalatest.Outcome
import org.apache.kyuubi.Utils
@@ -51,6 +53,18 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
|PARTITIONED BY (gender)
|""".stripMargin
+ def createPathBasedTableSql(path: Path): String =
+ s"""
+ |CREATE TABLE IF NOT EXISTS delta.`$path` (
+ | id INT,
+ | name STRING,
+ | gender STRING,
+ | birthDate TIMESTAMP
+ |)
+ |USING DELTA
+ |PARTITIONED BY (gender)
+ |""".stripMargin
+
override def withFixture(test: NoArgTest): Outcome = {
test()
}
@@ -293,6 +307,34 @@ class DeltaCatalogRangerSparkExtensionSuite extends
RangerSparkExtensionSuite {
doAs(admin, sql(vacuumTableSql))
}
}
+
+ test("create path-based table") {
+ withTempDir(path => {
+ val createTableSql = createPathBasedTableSql(path)
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(createTableSql))
+ }(s"does not have [write] privilege on [[$path, $path/]]")
+ doAs(admin, sql(createTableSql))
+ })
+ }
+
+ test("create or replace path-based table") {
+ withTempDir(path => {
+ val createOrReplaceTableSql =
+ s"""
+ |CREATE OR REPLACE TABLE delta.`$path` (
+ | id INT,
+ | name STRING,
+ | gender STRING,
+ | birthDate TIMESTAMP
+ |) USING DELTA
+ |""".stripMargin
+ interceptContains[AccessControlException] {
+ doAs(someone, sql(createOrReplaceTableSql))
+ }(s"does not have [write] privilege on [[$path, $path/]]")
+ doAs(admin, sql(createOrReplaceTableSql))
+ })
+ }
}
object DeltaCatalogRangerSparkExtensionSuite {