This is an automated email from the ASF dual-hosted git repository.
bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 04f443792b [KYUUBI #6754][AUTHZ] Improve the performance of Ranger
access requests deduplication
04f443792b is described below
commit 04f443792b4fad59b43454f2ac7f2413e9221d47
Author: wankunde <[email protected]>
AuthorDate: Mon Oct 21 21:17:51 2024 +0800
[KYUUBI #6754][AUTHZ] Improve the performance of Ranger access requests
deduplication
# :mag: Description
## Issue References ๐
This pull request fixes #6754
## Describe Your Solution ๐ง
Right now in RuleAuthorization we use an ArrayBuffer to collect access
requests, which is very slow because each new PrivilegeObject needs to be
compared with all access requests.
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
Add benchmark
Before
```sh
Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
Apple M3
Collecting files ranger access request: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
50000 files benchmark 181863 189434
NaN -0.0 -181863368958.0 1.0X
````
#### Behavior With This Pull Request :tada:
After
```sh
Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
Apple M3
Collecting files ranger access request: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
50000 files benchmark 1281 1310
33 -0.0 -1280563000.0 1.0X
```
#### Related Unit Tests
Exists UT
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6758 from wankunde/ranger2.
Closes #6754
9d7d1964b [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
88b9c049b [wankun] Update
extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
20c55fbeb [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml
f5a3b6ca5 [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
9793249de [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
d86b01f9c [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
b904b491b [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
aad08a6bb [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
1374604bc [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
01e15c149 [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml
805e8a9c0 [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml
e19817943 [wankunde] [KYUUBI #6754] Improve the performance of ranger
access requests
Lead-authored-by: wankunde <[email protected]>
Co-authored-by: wankun <[email protected]>
Signed-off-by: Bowen Liang <[email protected]>
---
.../RuleAuthorizationBenchmark-jdk17-results.txt | 6 ++
extensions/spark/kyuubi-spark-authz/pom.xml | 7 ++
.../spark/authz/ranger/RuleAuthorization.scala | 13 ++-
.../authz/benchmark/KyuubiBenchmarkBase.scala | 71 ++++++++++++++++
.../spark/sql/RuleAuthorizationBenchmark.scala | 94 ++++++++++++++++++++++
5 files changed, 187 insertions(+), 4 deletions(-)
diff --git
a/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
new file mode 100644
index 0000000000..87f7c6e3dd
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
@@ -0,0 +1,6 @@
+Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
+Apple M3
+Collecting files ranger access request: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+50000 files benchmark 1281 1310
33 -0.0 -1280563000.0 1.0X
+
diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml
b/extensions/spark/kyuubi-spark-authz/pom.xml
index 8ab3720b13..b3970cc2c8 100644
--- a/extensions/spark/kyuubi-spark-authz/pom.xml
+++ b/extensions/spark/kyuubi-spark-authz/pom.xml
@@ -380,6 +380,13 @@
<artifactId>${hudi.artifact}</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
index e25cd2a700..1fd86a3789 100644
---
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
+++
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.plugin.spark.authz.ranger
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
import org.apache.ranger.plugin.policyengine.RangerAccessRequest
import org.apache.spark.sql.SparkSession
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.kyuubi.plugin.spark.authz._
import org.apache.kyuubi.plugin.spark.authz.ObjectType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
import org.apache.kyuubi.plugin.spark.authz.rule.Authorization
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -34,15 +35,19 @@ case class RuleAuthorization(spark: SparkSession) extends
Authorization(spark) {
val auditHandler = new SparkRangerAuditHandler
val ugi = getAuthzUgi(spark.sparkContext)
val (inputs, outputs, opType) = PrivilegesBuilder.build(plan, spark)
- val requests = new ArrayBuffer[AccessRequest]()
+
+ // Use a HashSet to deduplicate the same AccessResource and AccessType,
the requests will be all
+ // the non-duplicate requests and in the same order as the input requests.
+ val requests = new mutable.ArrayBuffer[AccessRequest]()
+ val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()
def addAccessRequest(objects: Iterable[PrivilegeObject], isInput:
Boolean): Unit = {
objects.foreach { obj =>
val resource = AccessResource(obj, opType)
val accessType = ranger.AccessType(obj, opType, isInput)
- if (accessType != AccessType.NONE && !requests.exists(o =>
- o.accessType == accessType && o.getResource == resource)) {
+ if (accessType != AccessType.NONE && !requestsSet.contains((resource,
accessType))) {
requests += AccessRequest(resource, ugi, opType, accessType)
+ requestsSet.add(resource, accessType)
}
}
}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
new file mode 100644
index 0000000000..b1be2c9577
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.benchmark
+
+import java.io.{File, FileOutputStream, OutputStream}
+
+import scala.collection.JavaConverters._
+
+import com.google.common.reflect.ClassPath
+import org.scalatest.Assertions._
+
+trait KyuubiBenchmarkBase {
+ var output: Option[OutputStream] = None
+
+ private val prefix = {
+ val benchmarkClasses =
ClassPath.from(Thread.currentThread.getContextClassLoader)
+ .getTopLevelClassesRecursive("org.apache.spark.sql").asScala.toArray
+ assert(benchmarkClasses.nonEmpty)
+ val benchmark =
benchmarkClasses.find(_.load().getName.endsWith("Benchmark"))
+ val targetDirOrProjDir =
+ new
File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI)
+ .getParentFile.getParentFile
+ if (targetDirOrProjDir.getName == "target") {
+ targetDirOrProjDir.getParentFile.getCanonicalPath + "/"
+ } else {
+ targetDirOrProjDir.getCanonicalPath + "/"
+ }
+ }
+
+ def withHeader(func: => Unit): Unit = {
+ val version = System.getProperty("java.version").split("\\D+")(0).toInt
+ val jdkString = if (version > 8) s"-jdk$version" else ""
+ val resultFileName =
+ s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt"
+ val dir = new File(s"${prefix}benchmarks/")
+ if (!dir.exists()) {
+ // scalastyle:off println
+ println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
+ // scalastyle:on println
+ dir.mkdirs()
+ }
+ val file = new File(dir, resultFileName)
+ if (!file.exists()) {
+ file.createNewFile()
+ }
+ output = Some(new FileOutputStream(file))
+
+ func
+
+ output.foreach { o =>
+ if (o != null) {
+ o.close()
+ }
+ }
+ }
+}
diff --git
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
new file mode 100644
index 0000000000..4286d2fca3
--- /dev/null
+++
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.file.Files
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.reflect.io.Path.jfile2path
+
+import org.apache.spark.benchmark.Benchmark
+import org.scalatest.BeforeAndAfterAll
+// scalastyle:off
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
+import org.apache.kyuubi.plugin.spark.authz.benchmark.KyuubiBenchmarkBase
+import org.apache.kyuubi.plugin.spark.authz.ranger.RuleAuthorization
+import org.apache.kyuubi.util.ThreadUtils
+
+/**
+ * Benchmark to measure performance of collecting ranger access requests.
+ *
+ * {{{
+ * RUN_BENCHMARK=1 ./build/mvn clean test \
+ * -pl extensions/spark/kyuubi-spark-authz -am \
+ * -Dtest=none
-DwildcardSuites=org.apache.spark.sql.RuleAuthorizationBenchmark
+ * }}}
+ */
+class RuleAuthorizationBenchmark extends AnyFunSuite
+ with SparkSessionProvider with BeforeAndAfterAll
+ with KyuubiBenchmarkBase {
+ // scalastyle:on
+
+ override protected val catalogImpl: String = "hive"
+ private val runBenchmark = sys.env.contains("RUN_BENCHMARK")
+
+ private val fileNumbers = 50000
+
+ private var tempDir: File = _
+
+ override def beforeAll(): Unit = {
+ tempDir = Files.createTempDirectory("kyuubi-test-").toFile
+ }
+
+ override def afterAll(): Unit = {
+ if (tempDir != null) {
+ tempDir.deleteRecursively()
+ }
+ spark.stop()
+ super.afterAll()
+ }
+
+ test("Collecting files ranger access request") {
+ assume(runBenchmark)
+
+ val futures = (1 to fileNumbers).map { i =>
+ Future {
+ val file = new File(tempDir, s"file_$i.txt")
+ file.createNewFile()
+ }
+ }
+ val allFutures = Future.sequence(futures)
+ ThreadUtils.awaitResult(allFutures, Duration.Inf)
+
+ val df = spark.read.text(tempDir + "/file_*.txt")
+ val plan = df.queryExecution.optimizedPlan
+
+ withHeader {
+ val benchmark = new Benchmark(s"Collecting files ranger access request",
-1, output = output)
+ benchmark.addCase(s"$fileNumbers files benchmark", 3) { _ =>
+ RuleAuthorization(spark).checkPrivileges(spark, plan)
+ }
+ benchmark.run()
+ }
+ }
+}