This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 93ba8f762 [KYUUBI #4712] Bump Spark from 3.2.3 to 3.2.4
93ba8f762 is described below
commit 93ba8f762f35e5467dbb6cd51ef4e82ba2f74d05
Author: Anurag Rajawat <[email protected]>
AuthorDate: Mon Apr 17 09:14:20 2023 +0800
[KYUUBI #4712] Bump Spark from 3.2.3 to 3.2.4
### _Why are the changes needed?_
Fixes #4712
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #4718 from anurag-rajawat/upgrade-spark.
Closes #4712
79dcf1b79 [Anurag Rajawat] Bump Spark from 3.2.3 to 3.2.4
Authored-by: Anurag Rajawat <[email protected]>
Signed-off-by: liangbowen <[email protected]>
---
.github/workflows/master.yml | 2 +-
...uubi.plugin.spark.authz.serde.FunctionExtractor | 1 +
....plugin.spark.authz.serde.FunctionTypeExtractor | 1 +
.../src/main/resources/scan_command_spec.json | 29 ---
.../src/main/resources/scan_spec.json | 89 ++++++++++
.../src/main/resources/table_command_spec.json | 16 +-
.../plugin/spark/authz/PrivilegesBuilder.scala | 23 +++
.../plugin/spark/authz/serde/CommandSpec.scala | 16 +-
.../spark/authz/serde/functionExtractors.scala | 22 +++
.../spark/authz/serde/functionTypeExtractors.scala | 36 +++-
.../kyuubi/plugin/spark/authz/serde/package.scala | 20 ++-
.../authz/FunctionPrivilegesBuilderSuite.scala | 196 +++++++++++++++++++++
.../spark/authz/gen/JsonSpecFileGenerator.scala | 9 +-
.../kyuubi/plugin/spark/authz/gen/Scans.scala | 28 ++-
.../plugin/spark/authz/gen/TableCommands.scala | 2 +-
pom.xml | 2 +-
16 files changed, 436 insertions(+), 56 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index ae5b8188d..ece87e265 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -61,7 +61,7 @@ jobs:
comment: 'verify-on-spark-3.1-binary'
- java: 8
spark: '3.3'
- spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.3
-Dspark.archive.name=spark-3.2.3-bin-hadoop3.2.tgz'
+ spark-archive:
'-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4
-Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz'
exclude-tags:
'-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.IcebergTest'
comment: 'verify-on-spark-3.2-binary'
env:
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor
index 4686bb033..2facb004a 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor
@@ -17,4 +17,5 @@
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionInfoFunctionExtractor
org.apache.kyuubi.plugin.spark.authz.serde.FunctionIdentifierFunctionExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.QualifiedNameStringFunctionExtractor
org.apache.kyuubi.plugin.spark.authz.serde.StringFunctionExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionTypeExtractor
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionTypeExtractor
index 475f47afc..3bb0ee6c2 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionTypeExtractor
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/META-INF/services/org.apache.kyuubi.plugin.spark.authz.serde.FunctionTypeExtractor
@@ -17,4 +17,5 @@
org.apache.kyuubi.plugin.spark.authz.serde.ExpressionInfoFunctionTypeExtractor
org.apache.kyuubi.plugin.spark.authz.serde.FunctionIdentifierFunctionTypeExtractor
+org.apache.kyuubi.plugin.spark.authz.serde.FunctionNameFunctionTypeExtractor
org.apache.kyuubi.plugin.spark.authz.serde.TempMarkerFunctionTypeExtractor
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/scan_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/scan_command_spec.json
deleted file mode 100644
index 9a6aef4ed..000000000
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/scan_command_spec.json
+++ /dev/null
@@ -1,29 +0,0 @@
-[ {
- "classname" :
"org.apache.kyuubi.plugin.spark.authz.util.PermanentViewMarker",
- "scanDescs" : [ {
- "fieldName" : "catalogTable",
- "fieldExtractor" : "CatalogTableTableExtractor",
- "catalogDesc" : null
- } ]
-}, {
- "classname" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
- "scanDescs" : [ {
- "fieldName" : "tableMeta",
- "fieldExtractor" : "CatalogTableTableExtractor",
- "catalogDesc" : null
- } ]
-}, {
- "classname" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
- "scanDescs" : [ {
- "fieldName" : "catalogTable",
- "fieldExtractor" : "CatalogTableOptionTableExtractor",
- "catalogDesc" : null
- } ]
-}, {
- "classname" :
"org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation",
- "scanDescs" : [ {
- "fieldName" : null,
- "fieldExtractor" : "DataSourceV2RelationTableExtractor",
- "catalogDesc" : null
- } ]
-} ]
\ No newline at end of file
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/scan_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/scan_spec.json
new file mode 100644
index 000000000..3273ccbea
--- /dev/null
+++ b/extensions/spark/kyuubi-spark-authz/src/main/resources/scan_spec.json
@@ -0,0 +1,89 @@
+[ {
+ "classname" :
"org.apache.kyuubi.plugin.spark.authz.util.PermanentViewMarker",
+ "scanDescs" : [ {
+ "fieldName" : "catalogTable",
+ "fieldExtractor" : "CatalogTableTableExtractor",
+ "catalogDesc" : null
+ } ],
+ "functionDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.catalyst.catalog.HiveTableRelation",
+ "scanDescs" : [ {
+ "fieldName" : "tableMeta",
+ "fieldExtractor" : "CatalogTableTableExtractor",
+ "catalogDesc" : null
+ } ],
+ "functionDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.execution.datasources.LogicalRelation",
+ "scanDescs" : [ {
+ "fieldName" : "catalogTable",
+ "fieldExtractor" : "CatalogTableOptionTableExtractor",
+ "catalogDesc" : null
+ } ],
+ "functionDescs" : [ ]
+}, {
+ "classname" :
"org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation",
+ "scanDescs" : [ {
+ "fieldName" : null,
+ "fieldExtractor" : "DataSourceV2RelationTableExtractor",
+ "catalogDesc" : null
+ } ],
+ "functionDescs" : [ ]
+}, {
+ "classname" : "org.apache.spark.sql.hive.HiveGenericUDF",
+ "scanDescs" : [ ],
+ "functionDescs" : [ {
+ "fieldName" : "name",
+ "fieldExtractor" : "QualifiedNameStringFunctionExtractor",
+ "databaseDesc" : null,
+ "functionTypeDesc" : {
+ "fieldName" : "name",
+ "fieldExtractor" : "FunctionNameFunctionTypeExtractor",
+ "skipTypes" : [ "TEMP", "SYSTEM" ]
+ },
+ "isInput" : true
+ } ]
+}, {
+ "classname" : "org.apache.spark.sql.hive.HiveGenericUDTF",
+ "scanDescs" : [ ],
+ "functionDescs" : [ {
+ "fieldName" : "name",
+ "fieldExtractor" : "QualifiedNameStringFunctionExtractor",
+ "databaseDesc" : null,
+ "functionTypeDesc" : {
+ "fieldName" : "name",
+ "fieldExtractor" : "FunctionNameFunctionTypeExtractor",
+ "skipTypes" : [ "TEMP", "SYSTEM" ]
+ },
+ "isInput" : true
+ } ]
+}, {
+ "classname" : "org.apache.spark.sql.hive.HiveSimpleUDF",
+ "scanDescs" : [ ],
+ "functionDescs" : [ {
+ "fieldName" : "name",
+ "fieldExtractor" : "QualifiedNameStringFunctionExtractor",
+ "databaseDesc" : null,
+ "functionTypeDesc" : {
+ "fieldName" : "name",
+ "fieldExtractor" : "FunctionNameFunctionTypeExtractor",
+ "skipTypes" : [ "TEMP", "SYSTEM" ]
+ },
+ "isInput" : true
+ } ]
+}, {
+ "classname" : "org.apache.spark.sql.hive.HiveUDAFFunction",
+ "scanDescs" : [ ],
+ "functionDescs" : [ {
+ "fieldName" : "name",
+ "fieldExtractor" : "QualifiedNameStringFunctionExtractor",
+ "databaseDesc" : null,
+ "functionTypeDesc" : {
+ "fieldName" : "name",
+ "fieldExtractor" : "FunctionNameFunctionTypeExtractor",
+ "skipTypes" : [ "TEMP", "SYSTEM" ]
+ },
+ "isInput" : true
+ } ]
+} ]
\ No newline at end of file
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
index 81ccd8da0..3d6fcd93b 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
+++
b/extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json
@@ -1243,14 +1243,6 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
-}, {
- "classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand",
- "tableDescs" : [ ],
- "opType" : "QUERY",
- "queryDescs" : [ {
- "fieldName" : "query",
- "fieldExtractor" : "LogicalPlanQueryExtractor"
- } ]
}, {
"classname" : "org.apache.spark.sql.execution.datasources.RefreshTable",
"tableDescs" : [ {
@@ -1293,6 +1285,14 @@
"fieldName" : "query",
"fieldExtractor" : "LogicalPlanQueryExtractor"
} ]
+}, {
+ "classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand",
+ "tableDescs" : [ ],
+ "opType" : "QUERY",
+ "queryDescs" : [ {
+ "fieldName" : "query",
+ "fieldExtractor" : "LogicalPlanQueryExtractor"
+ } ]
}, {
"classname" : "org.apache.spark.sql.hive.execution.InsertIntoHiveTable",
"tableDescs" : [ {
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
index b8220ea27..98e436189 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala
@@ -235,4 +235,27 @@ object PrivilegesBuilder {
}
(inputObjs, outputObjs, opType)
}
+
+ /**
+ * Build input privilege objects from a Spark's LogicalPlan for hive
permanent functions
+ *
+ * For `Command`s and other queries, build inputs.
+ *
+ * @param plan A Spark LogicalPlan
+ */
+ def buildFunctionPrivileges(
+ plan: LogicalPlan,
+ spark: SparkSession): PrivilegesAndOpType = {
+ val inputObjs = new ArrayBuffer[PrivilegeObject]
+ plan transformAllExpressions {
+ case hiveFunction: Expression if isKnowFunction(hiveFunction) =>
+ val functionSpec: ScanSpec = getFunctionSpec(hiveFunction)
+ if
(functionSpec.functionDescs.exists(!_.functionTypeDesc.get.skip(hiveFunction,
spark))) {
+ functionSpec.functions(hiveFunction).foreach(func =>
+ inputObjs += PrivilegeObject(func))
+ }
+ hiveFunction
+ }
+ (inputObjs, Seq.empty, OperationType.QUERY)
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/CommandSpec.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/CommandSpec.scala
index e96ef8cbf..32ad30e21 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/CommandSpec.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/CommandSpec.scala
@@ -19,6 +19,7 @@ package org.apache.kyuubi.plugin.spark.authz.serde
import com.fasterxml.jackson.annotation.JsonIgnore
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.slf4j.LoggerFactory
@@ -94,7 +95,8 @@ case class TableCommandSpec(
case class ScanSpec(
classname: String,
- scanDescs: Seq[ScanDesc]) extends CommandSpec {
+ scanDescs: Seq[ScanDesc],
+ functionDescs: Seq[FunctionDesc] = Seq.empty) extends CommandSpec {
override def opType: String = OperationType.QUERY.toString
def tables: (LogicalPlan, SparkSession) => Seq[Table] = (plan, spark) => {
scanDescs.flatMap { td =>
@@ -107,4 +109,16 @@ case class ScanSpec(
}
}
}
+
+ def functions: (Expression) => Seq[Function] = (expr) => {
+ functionDescs.flatMap { fd =>
+ try {
+ Some(fd.extract(expr))
+ } catch {
+ case e: Exception =>
+ LOG.debug(fd.error(expr, e))
+ None
+ }
+ }
+ }
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionExtractors.scala
index 894a6cb8f..729521200 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionExtractors.scala
@@ -20,12 +20,23 @@ package org.apache.kyuubi.plugin.spark.authz.serde
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions.ExpressionInfo
+import
org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor.buildFunctionIdentFromQualifiedName
+
trait FunctionExtractor extends (AnyRef => Function) with Extractor
object FunctionExtractor {
val functionExtractors: Map[String, FunctionExtractor] = {
loadExtractorsToMap[FunctionExtractor]
}
+
+ def buildFunctionIdentFromQualifiedName(qualifiedName: String): (String,
Option[String]) = {
+ val parts: Array[String] = qualifiedName.split("\\.", 2)
+ if (parts.length == 1) {
+ (qualifiedName, None)
+ } else {
+ (parts.last, Some(parts.head))
+ }
+ }
}
/**
@@ -37,6 +48,17 @@ class StringFunctionExtractor extends FunctionExtractor {
}
}
+/**
+ * * String
+ */
+class QualifiedNameStringFunctionExtractor extends FunctionExtractor {
+ override def apply(v1: AnyRef): Function = {
+ val qualifiedName: String = v1.asInstanceOf[String]
+ val (funcName, database) =
buildFunctionIdentFromQualifiedName(qualifiedName)
+ Function(database, funcName)
+ }
+}
+
/**
* org.apache.spark.sql.catalyst.FunctionIdentifier
*/
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionTypeExtractors.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionTypeExtractors.scala
index 4c5e9dc84..193a00fa5 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionTypeExtractors.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/functionTypeExtractors.scala
@@ -19,8 +19,11 @@ package org.apache.kyuubi.plugin.spark.authz.serde
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import
org.apache.kyuubi.plugin.spark.authz.serde.FunctionExtractor.buildFunctionIdentFromQualifiedName
import org.apache.kyuubi.plugin.spark.authz.serde.FunctionType.{FunctionType,
PERMANENT, SYSTEM, TEMP}
+import
org.apache.kyuubi.plugin.spark.authz.serde.FunctionTypeExtractor.getFunctionType
object FunctionType extends Enumeration {
type FunctionType = Value
@@ -33,6 +36,17 @@ object FunctionTypeExtractor {
val functionTypeExtractors: Map[String, FunctionTypeExtractor] = {
loadExtractorsToMap[FunctionTypeExtractor]
}
+
+ def getFunctionType(fi: FunctionIdentifier, catalog: SessionCatalog):
FunctionType = {
+ fi match {
+ case permanent if catalog.isPersistentFunction(permanent) =>
+ PERMANENT
+ case system if catalog.isRegisteredFunction(system) =>
+ SYSTEM
+ case _ =>
+ TEMP
+ }
+ }
}
/**
@@ -66,14 +80,18 @@ class FunctionIdentifierFunctionTypeExtractor extends
FunctionTypeExtractor {
override def apply(v1: AnyRef, spark: SparkSession): FunctionType = {
val catalog = spark.sessionState.catalog
val fi = v1.asInstanceOf[FunctionIdentifier]
- if (catalog.isTemporaryFunction(fi)) {
- TEMP
- } else if (catalog.isPersistentFunction(fi)) {
- PERMANENT
- } else if (catalog.isRegisteredFunction(fi)) {
- SYSTEM
- } else {
- TEMP
- }
+ getFunctionType(fi, catalog)
+ }
+}
+
+/**
+ * String
+ */
+class FunctionNameFunctionTypeExtractor extends FunctionTypeExtractor {
+ override def apply(v1: AnyRef, spark: SparkSession): FunctionType = {
+ val catalog: SessionCatalog = spark.sessionState.catalog
+ val qualifiedName: String = v1.asInstanceOf[String]
+ val (funcName, database) =
buildFunctionIdentFromQualifiedName(qualifiedName)
+ getFunctionType(FunctionIdentifier(funcName, database), catalog)
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
index a52a558a0..07f91a95d 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/serde/package.scala
@@ -66,9 +66,10 @@ package object serde {
}
final private lazy val SCAN_SPECS: Map[String, ScanSpec] = {
- val is =
getClass.getClassLoader.getResourceAsStream("scan_command_spec.json")
+ val is = getClass.getClassLoader.getResourceAsStream("scan_spec.json")
mapper.readValue(is, new TypeReference[Array[ScanSpec]] {})
- .map(e => (e.classname, e)).toMap
+ .map(e => (e.classname, e))
+ .filter(t => t._2.scanDescs.nonEmpty).toMap
}
def isKnownScan(r: AnyRef): Boolean = {
@@ -79,6 +80,21 @@ package object serde {
SCAN_SPECS(r.getClass.getName)
}
+ final private lazy val FUNCTION_SPECS: Map[String, ScanSpec] = {
+ val is = getClass.getClassLoader.getResourceAsStream("scan_spec.json")
+ mapper.readValue(is, new TypeReference[Array[ScanSpec]] {})
+ .map(e => (e.classname, e))
+ .filter(t => t._2.functionDescs.nonEmpty).toMap
+ }
+
+ def isKnowFunction(r: AnyRef): Boolean = {
+ FUNCTION_SPECS.contains(r.getClass.getName)
+ }
+
+ def getFunctionSpec(r: AnyRef): ScanSpec = {
+ FUNCTION_SPECS(r.getClass.getName)
+ }
+
def operationType(plan: LogicalPlan): OperationType = {
val classname = plan.getClass.getName
TABLE_COMMAND_SPECS.get(classname)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala
new file mode 100644
index 000000000..e8da4e871
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+// scalastyle:off
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.kyuubi.plugin.spark.authz.OperationType.QUERY
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType
+
+abstract class FunctionPrivilegesBuilderSuite extends AnyFunSuite
+ with SparkSessionProvider with BeforeAndAfterAll with BeforeAndAfterEach {
+ // scalastyle:on
+
+ protected def withTable(t: String)(f: String => Unit): Unit = {
+ try {
+ f(t)
+ } finally {
+ sql(s"DROP TABLE IF EXISTS $t")
+ }
+ }
+
+ protected def withDatabase(t: String)(f: String => Unit): Unit = {
+ try {
+ f(t)
+ } finally {
+ sql(s"DROP DATABASE IF EXISTS $t")
+ }
+ }
+
+ protected def checkColumns(plan: LogicalPlan, cols: Seq[String]): Unit = {
+ val (in, out, _) = PrivilegesBuilder.build(plan, spark)
+ assert(out.isEmpty, "Queries shall not check output privileges")
+ val po = in.head
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
+ assert(po.columns === cols)
+ }
+
+ protected def checkColumns(query: String, cols: Seq[String]): Unit = {
+ checkColumns(sql(query).queryExecution.optimizedPlan, cols)
+ }
+
+ protected val reusedDb: String = getClass.getSimpleName
+ protected val reusedDb2: String = getClass.getSimpleName + "2"
+ protected val reusedTable: String = reusedDb + "." + getClass.getSimpleName
+ protected val reusedTableShort: String = reusedTable.split("\\.").last
+ protected val reusedPartTable: String = reusedTable + "_part"
+ protected val reusedPartTableShort: String =
reusedPartTable.split("\\.").last
+ protected val functionCount = 3
+ protected val functionNamePrefix = "kyuubi_fun_"
+ protected val tempFunNamePrefix = "kyuubi_temp_fun_"
+
+ override def beforeAll(): Unit = {
+ sql(s"CREATE DATABASE IF NOT EXISTS $reusedDb")
+ sql(s"CREATE DATABASE IF NOT EXISTS $reusedDb2")
+ sql(s"CREATE TABLE IF NOT EXISTS $reusedTable" +
+ s" (key int, value string) USING parquet")
+ sql(s"CREATE TABLE IF NOT EXISTS $reusedPartTable" +
+ s" (key int, value string, pid string) USING parquet" +
+ s" PARTITIONED BY(pid)")
+ // scalastyle:off
+ (0 until functionCount).foreach { index =>
+ {
+ sql(s"CREATE FUNCTION ${reusedDb}.${functionNamePrefix}${index} AS
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash'")
+ sql(s"CREATE FUNCTION ${reusedDb2}.${functionNamePrefix}${index} AS
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash'")
+ sql(s"CREATE TEMPORARY FUNCTION ${tempFunNamePrefix}${index} AS
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash'")
+ }
+ }
+ sql(s"USE ${reusedDb2}")
+ // scalastyle:on
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ Seq(reusedTable, reusedPartTable).foreach { t =>
+ sql(s"DROP TABLE IF EXISTS $t")
+ }
+
+ Seq(reusedDb, reusedDb2).foreach { db =>
+ (0 until functionCount).foreach { index =>
+ sql(s"DROP FUNCTION ${db}.${functionNamePrefix}${index}")
+ }
+ sql(s"DROP DATABASE IF EXISTS ${db}")
+ }
+
+ spark.stop()
+ super.afterAll()
+ }
+}
+
+class HiveFunctionPrivilegesBuilderSuite extends
FunctionPrivilegesBuilderSuite {
+
+ override protected val catalogImpl: String = "hive"
+
+ test("Function Call Query") {
+ val plan = sql(s"SELECT kyuubi_fun_1('data'), " +
+ s"kyuubi_fun_2(value), " +
+ s"${reusedDb}.kyuubi_fun_0(value), " +
+ s"kyuubi_temp_fun_1('data2')," +
+ s"kyuubi_temp_fun_2(key) " +
+ s"FROM $reusedTable").queryExecution.analyzed
+ val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
+ assert(inputs.size === 3)
+ inputs.foreach { po =>
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
+ assert(po.dbname startsWith reusedDb.toLowerCase)
+ assert(po.objectName startsWith functionNamePrefix.toLowerCase)
+ val accessType = ranger.AccessType(po, QUERY, isInput = true)
+ assert(accessType === AccessType.SELECT)
+ }
+ }
+
+ test("Function Call Query with Quoted Name") {
+ val plan = sql(s"SELECT `kyuubi_fun_1`('data'), " +
+ s"`kyuubi_fun_2`(value), " +
+ s"`${reusedDb}`.`kyuubi_fun_0`(value), " +
+ s"`kyuubi_temp_fun_1`('data2')," +
+ s"`kyuubi_temp_fun_2`(key) " +
+ s"FROM $reusedTable").queryExecution.analyzed
+ val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
+ assert(inputs.size === 3)
+ inputs.foreach { po =>
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
+ assert(po.dbname startsWith reusedDb.toLowerCase)
+ assert(po.objectName startsWith functionNamePrefix.toLowerCase)
+ val accessType = ranger.AccessType(po, QUERY, isInput = true)
+ assert(accessType === AccessType.SELECT)
+ }
+ }
+
+ test("Simple Function Call Query") {
+ val plan = sql(s"SELECT kyuubi_fun_1('data'), " +
+ s"kyuubi_fun_0('value'), " +
+ s"${reusedDb}.kyuubi_fun_0('value'), " +
+ s"${reusedDb}.kyuubi_fun_2('value'), " +
+ s"kyuubi_temp_fun_1('data2')," +
+ s"kyuubi_temp_fun_2('key') ").queryExecution.analyzed
+ val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan, spark)
+ assert(inputs.size === 4)
+ inputs.foreach { po =>
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
+ assert(po.dbname startsWith reusedDb.toLowerCase)
+ assert(po.objectName startsWith functionNamePrefix.toLowerCase)
+ val accessType = ranger.AccessType(po, QUERY, isInput = true)
+ assert(accessType === AccessType.SELECT)
+ }
+ }
+
+ test("Function Call In CAST Command") {
+ val table = "castTable"
+ withTable(table) { table =>
+ val plan = sql(s"CREATE TABLE ${table} " +
+ s"SELECT kyuubi_fun_1('data') col1, " +
+ s"${reusedDb2}.kyuubi_fun_2(value) col2, " +
+ s"kyuubi_fun_0(value) col3, " +
+ s"kyuubi_fun_2('value') col4, " +
+ s"${reusedDb}.kyuubi_fun_2('value') col5, " +
+ s"${reusedDb}.kyuubi_fun_1('value') col6, " +
+ s"kyuubi_temp_fun_1('data2') col7, " +
+ s"kyuubi_temp_fun_2(key) col8 " +
+ s"FROM ${reusedTable} WHERE
${reusedDb2}.kyuubi_fun_1(key)='123'").queryExecution.analyzed
+ val (inputs, _, _) = PrivilegesBuilder.buildFunctionPrivileges(plan,
spark)
+ assert(inputs.size === 7)
+ inputs.foreach { po =>
+ assert(po.actionType === PrivilegeObjectActionType.OTHER)
+ assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
+ assert(po.dbname startsWith reusedDb.toLowerCase)
+ assert(po.objectName startsWith functionNamePrefix.toLowerCase)
+ val accessType = ranger.AccessType(po, QUERY, isInput = true)
+ assert(accessType === AccessType.SELECT)
+ }
+ }
+ }
+
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
index 7c7ed138b..e20cd13d7 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/JsonSpecFileGenerator.scala
@@ -34,13 +34,16 @@ object JsonSpecFileGenerator {
writeCommandSpecJson("database", DatabaseCommands.data)
writeCommandSpecJson("table", TableCommands.data ++ IcebergCommands.data)
writeCommandSpecJson("function", FunctionCommands.data)
- writeCommandSpecJson("scan", Scans.data)
+ writeCommandSpecJson("scan", Scans.data, isScanResource = true)
}
- def writeCommandSpecJson[T <: CommandSpec](commandType: String, specArr:
Array[T]): Unit = {
+ def writeCommandSpecJson[T <: CommandSpec](
+ commandType: String,
+ specArr: Array[T],
+ isScanResource: Boolean = false): Unit = {
val pluginHome =
getClass.getProtectionDomain.getCodeSource.getLocation.getPath
.split("target").head
- val filename = s"${commandType}_command_spec.json"
+ val filename = s"${commandType}${if (isScanResource) "" else
"_command"}_spec.json"
val writer = {
val p = Paths.get(pluginHome, "src", "main", "resources", filename)
Files.newBufferedWriter(p, StandardCharsets.UTF_8)
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/Scans.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/Scans.scala
index 7bd8260bb..b2c1868a2 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/Scans.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/Scans.scala
@@ -18,6 +18,7 @@
package org.apache.kyuubi.plugin.spark.authz.gen
import org.apache.kyuubi.plugin.spark.authz.serde._
+import org.apache.kyuubi.plugin.spark.authz.serde.FunctionType._
object Scans {
@@ -57,9 +58,34 @@ object Scans {
ScanSpec(r, Seq(tableDesc))
}
+ val HiveSimpleUDF = {
+ ScanSpec(
+ "org.apache.spark.sql.hive.HiveSimpleUDF",
+ Seq.empty,
+ Seq(FunctionDesc(
+ "name",
+ classOf[QualifiedNameStringFunctionExtractor],
+ functionTypeDesc = Some(FunctionTypeDesc(
+ "name",
+ classOf[FunctionNameFunctionTypeExtractor],
+ Seq(TEMP, SYSTEM))),
+ isInput = true)))
+ }
+
+ val HiveGenericUDF = HiveSimpleUDF.copy(classname =
"org.apache.spark.sql.hive.HiveGenericUDF")
+
+ val HiveUDAFFunction = HiveSimpleUDF.copy(classname =
+ "org.apache.spark.sql.hive.HiveUDAFFunction")
+
+ val HiveGenericUDTF = HiveSimpleUDF.copy(classname =
"org.apache.spark.sql.hive.HiveGenericUDTF")
+
val data: Array[ScanSpec] = Array(
HiveTableRelation,
LogicalRelation,
DataSourceV2Relation,
- PermanentViewMarker)
+ PermanentViewMarker,
+ HiveSimpleUDF,
+ HiveGenericUDF,
+ HiveUDAFFunction,
+ HiveGenericUDTF)
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
index 7bf01b43f..4f971ba62 100644
---
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/gen/TableCommands.scala
@@ -637,7 +637,7 @@ object TableCommands {
"org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand"),
InsertIntoHadoopFsRelationCommand,
InsertIntoDataSourceDir.copy(classname =
-
"org.apache.spark.sql.execution.datasources.InsertIntoDataSourceDirCommand"),
+ "org.apache.spark.sql.hive.execution.InsertIntoHiveDirCommand"),
InsertIntoHiveTable,
LoadData,
MergeIntoTable,
diff --git a/pom.xml b/pom.xml
index f772bd1b4..520b181d5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2337,7 +2337,7 @@
<module>extensions/spark/kyuubi-extension-spark-3-2</module>
</modules>
<properties>
- <spark.version>3.2.3</spark.version>
+ <spark.version>3.2.4</spark.version>
<spark.binary.version>3.2</spark.binary.version>
<delta.version>2.0.2</delta.version>
<spark.archive.name>spark-${spark.version}-bin-hadoop3.2.tgz</spark.archive.name>