This is an automated email from the ASF dual-hosted git repository.

bowenliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 04f443792b [KYUUBI #6754][AUTHZ] Improve the performance of Ranger 
access requests deduplication
04f443792b is described below

commit 04f443792b4fad59b43454f2ac7f2413e9221d47
Author: wankunde <[email protected]>
AuthorDate: Mon Oct 21 21:17:51 2024 +0800

    [KYUUBI #6754][AUTHZ] Improve the performance of Ranger access requests 
deduplication
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6754
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Right now in RuleAuthorization we use an ArrayBuffer to collect access 
requests, which is very slow because each new PrivilegeObject needs to be 
compared with all access requests.
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    Add benchmark
    Before
    ```sh
    Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
    Apple M3
    Collecting files ranger access request:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    50000 files benchmark                            181863         189434      
   NaN         -0.0 -181863368958.0       1.0X
    ````
    
    #### Behavior With This Pull Request :tada:
    
    After
    ```sh
    Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
    Apple M3
    Collecting files ranger access request:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------------------------------
    50000 files benchmark                              1281           1310      
    33         -0.0 -1280563000.0       1.0X
    ```
    
    #### Related Unit Tests
    
    Exists UT
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6758 from wankunde/ranger2.
    
    Closes #6754
    
    9d7d1964b [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    88b9c049b [wankun] Update 
extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
    20c55fbeb [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml
    f5a3b6ca5 [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    9793249de [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    d86b01f9c [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    b904b491b [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    aad08a6bb [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    1374604bc [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    01e15c149 [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml
    805e8a9c0 [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml
    e19817943 [wankunde] [KYUUBI #6754] Improve the performance of ranger 
access requests
    
    Lead-authored-by: wankunde <[email protected]>
    Co-authored-by: wankun <[email protected]>
    Signed-off-by: Bowen Liang <[email protected]>
---
 .../RuleAuthorizationBenchmark-jdk17-results.txt   |  6 ++
 extensions/spark/kyuubi-spark-authz/pom.xml        |  7 ++
 .../spark/authz/ranger/RuleAuthorization.scala     | 13 ++-
 .../authz/benchmark/KyuubiBenchmarkBase.scala      | 71 ++++++++++++++++
 .../spark/sql/RuleAuthorizationBenchmark.scala     | 94 ++++++++++++++++++++++
 5 files changed, 187 insertions(+), 4 deletions(-)

diff --git 
a/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
 
b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
new file mode 100644
index 0000000000..87f7c6e3dd
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/benchmarks/RuleAuthorizationBenchmark-jdk17-results.txt
@@ -0,0 +1,6 @@
+Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
+Apple M3
+Collecting files ranger access request:   Best Time(ms)   Avg Time(ms)   
Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
+------------------------------------------------------------------------------------------------------------------------
+50000 files benchmark                              1281           1310         
 33         -0.0 -1280563000.0       1.0X
+
diff --git a/extensions/spark/kyuubi-spark-authz/pom.xml 
b/extensions/spark/kyuubi-spark-authz/pom.xml
index 8ab3720b13..b3970cc2c8 100644
--- a/extensions/spark/kyuubi-spark-authz/pom.xml
+++ b/extensions/spark/kyuubi-spark-authz/pom.xml
@@ -380,6 +380,13 @@
             <artifactId>${hudi.artifact}</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
index e25cd2a700..1fd86a3789 100644
--- 
a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
+++ 
b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.plugin.spark.authz.ranger
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable
 
 import org.apache.ranger.plugin.policyengine.RangerAccessRequest
 import org.apache.spark.sql.SparkSession
@@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
 import org.apache.kyuubi.plugin.spark.authz._
 import org.apache.kyuubi.plugin.spark.authz.ObjectType._
+import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
 import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
 import org.apache.kyuubi.plugin.spark.authz.rule.Authorization
 import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -34,15 +35,19 @@ case class RuleAuthorization(spark: SparkSession) extends 
Authorization(spark) {
     val auditHandler = new SparkRangerAuditHandler
     val ugi = getAuthzUgi(spark.sparkContext)
     val (inputs, outputs, opType) = PrivilegesBuilder.build(plan, spark)
-    val requests = new ArrayBuffer[AccessRequest]()
+
+    // Use a HashSet to deduplicate the same AccessResource and AccessType, 
the requests will be all
+    // the non-duplicate requests and in the same order as the input requests.
+    val requests = new mutable.ArrayBuffer[AccessRequest]()
+    val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()
 
     def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: 
Boolean): Unit = {
       objects.foreach { obj =>
         val resource = AccessResource(obj, opType)
         val accessType = ranger.AccessType(obj, opType, isInput)
-        if (accessType != AccessType.NONE && !requests.exists(o =>
-            o.accessType == accessType && o.getResource == resource)) {
+        if (accessType != AccessType.NONE && !requestsSet.contains((resource, 
accessType))) {
           requests += AccessRequest(resource, ugi, opType, accessType)
+          requestsSet.add(resource, accessType)
         }
       }
     }
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
new file mode 100644
index 0000000000..b1be2c9577
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/benchmark/KyuubiBenchmarkBase.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.plugin.spark.authz.benchmark
+
+import java.io.{File, FileOutputStream, OutputStream}
+
+import scala.collection.JavaConverters._
+
+import com.google.common.reflect.ClassPath
+import org.scalatest.Assertions._
+
+trait KyuubiBenchmarkBase {
+  var output: Option[OutputStream] = None
+
+  private val prefix = {
+    val benchmarkClasses = 
ClassPath.from(Thread.currentThread.getContextClassLoader)
+      .getTopLevelClassesRecursive("org.apache.spark.sql").asScala.toArray
+    assert(benchmarkClasses.nonEmpty)
+    val benchmark = 
benchmarkClasses.find(_.load().getName.endsWith("Benchmark"))
+    val targetDirOrProjDir =
+      new 
File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI)
+        .getParentFile.getParentFile
+    if (targetDirOrProjDir.getName == "target") {
+      targetDirOrProjDir.getParentFile.getCanonicalPath + "/"
+    } else {
+      targetDirOrProjDir.getCanonicalPath + "/"
+    }
+  }
+
+  def withHeader(func: => Unit): Unit = {
+    val version = System.getProperty("java.version").split("\\D+")(0).toInt
+    val jdkString = if (version > 8) s"-jdk$version" else ""
+    val resultFileName =
+      s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt"
+    val dir = new File(s"${prefix}benchmarks/")
+    if (!dir.exists()) {
+      // scalastyle:off println
+      println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
+      // scalastyle:on println
+      dir.mkdirs()
+    }
+    val file = new File(dir, resultFileName)
+    if (!file.exists()) {
+      file.createNewFile()
+    }
+    output = Some(new FileOutputStream(file))
+
+    func
+
+    output.foreach { o =>
+      if (o != null) {
+        o.close()
+      }
+    }
+  }
+}
diff --git 
a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
new file mode 100644
index 0000000000..4286d2fca3
--- /dev/null
+++ 
b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+import java.nio.file.Files
+
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.concurrent.duration.Duration
+import scala.reflect.io.Path.jfile2path
+
+import org.apache.spark.benchmark.Benchmark
+import org.scalatest.BeforeAndAfterAll
+// scalastyle:off
+import org.scalatest.funsuite.AnyFunSuite
+
+import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
+import org.apache.kyuubi.plugin.spark.authz.benchmark.KyuubiBenchmarkBase
+import org.apache.kyuubi.plugin.spark.authz.ranger.RuleAuthorization
+import org.apache.kyuubi.util.ThreadUtils
+
+/**
+ * Benchmark to measure performance of collecting ranger access requests.
+ *
+ * {{{
+ *   RUN_BENCHMARK=1 ./build/mvn clean test \
+ *   -pl extensions/spark/kyuubi-spark-authz -am \
+ *   -Dtest=none 
-DwildcardSuites=org.apache.spark.sql.RuleAuthorizationBenchmark
+ * }}}
+ */
+class RuleAuthorizationBenchmark extends AnyFunSuite
+  with SparkSessionProvider with BeforeAndAfterAll
+  with KyuubiBenchmarkBase {
+  // scalastyle:on
+
+  override protected val catalogImpl: String = "hive"
+  private val runBenchmark = sys.env.contains("RUN_BENCHMARK")
+
+  private val fileNumbers = 50000
+
+  private var tempDir: File = _
+
+  override def beforeAll(): Unit = {
+    tempDir = Files.createTempDirectory("kyuubi-test-").toFile
+  }
+
+  override def afterAll(): Unit = {
+    if (tempDir != null) {
+      tempDir.deleteRecursively()
+    }
+    spark.stop()
+    super.afterAll()
+  }
+
+  test("Collecting files ranger access request") {
+    assume(runBenchmark)
+
+    val futures = (1 to fileNumbers).map { i =>
+      Future {
+        val file = new File(tempDir, s"file_$i.txt")
+        file.createNewFile()
+      }
+    }
+    val allFutures = Future.sequence(futures)
+    ThreadUtils.awaitResult(allFutures, Duration.Inf)
+
+    val df = spark.read.text(tempDir + "/file_*.txt")
+    val plan = df.queryExecution.optimizedPlan
+
+    withHeader {
+      val benchmark = new Benchmark(s"Collecting files ranger access request", 
-1, output = output)
+      benchmark.addCase(s"$fileNumbers files benchmark", 3) { _ =>
+        RuleAuthorization(spark).checkPrivileges(spark, plan)
+      }
+      benchmark.run()
+    }
+  }
+}

Reply via email to