This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 81371c9462 [VL] RAS: A benchmark suite for performance of query 
optimization (#8339)
81371c9462 is described below

commit 81371c9462f68e40e9ccead4d43fa8781cdbbf59
Author: Hongze Zhang <[email protected]>
AuthorDate: Wed Dec 25 16:15:39 2024 +0800

    [VL] RAS: A benchmark suite for performance of query optimization (#8339)
---
 .../enumerated}/planner/VeloxRasSuite.scala        |   3 +-
 .../execution/benchmark/VeloxRasBenchmark.scala    | 165 +++++++++++++++++++++
 .../org/apache/gluten/ras/rule/RuleApplier.scala   |  28 ++--
 3 files changed, 176 insertions(+), 20 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala
similarity index 98%
rename from 
backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
rename to 
backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala
index 885b549d0d..e3e006f7aa 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/extension/columnar/enumerated/planner/VeloxRasSuite.scala
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.planner
+package org.apache.gluten.extension.columnar.enumerated.planner
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.columnar.enumerated.EnumeratedTransform
-import 
org.apache.gluten.extension.columnar.enumerated.planner.GlutenOptimization
 import 
org.apache.gluten.extension.columnar.enumerated.planner.cost.{LegacyCoster, 
LongCostModel}
 import org.apache.gluten.extension.columnar.enumerated.planner.property.Conv
 import org.apache.gluten.extension.columnar.transition.ConventionReq
diff --git 
a/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
new file mode 100644
index 0000000000..10c002b8a0
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/spark/sql/execution/benchmark/VeloxRasBenchmark.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.benchmark
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.execution.Table
+import org.apache.gluten.utils.Arm
+
+import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.internal.SQLConf
+
+import java.io.File
+
+import scala.concurrent.duration.DurationInt
+import scala.io.Source
+
+/**
+ * The benchmark measures on RAS query optimization performance only. 
Performance of query execution
+ * is not considered.
+ */
+object VeloxRasBenchmark extends SqlBasedBenchmark {
+  private val tpchQueries: String =
+    getClass
+      .getResource("/")
+      .getPath + 
"../../../../tools/gluten-it/common/src/main/resources/tpch-queries"
+  private val dataDirPath: String =
+    getClass
+      .getResource("/tpch-data-parquet")
+      .getFile
+
+  private val tpchTables: Seq[Table] = Seq(
+    Table("part", partitionColumns = "p_brand" :: Nil),
+    Table("supplier", partitionColumns = Nil),
+    Table("partsupp", partitionColumns = Nil),
+    Table("customer", partitionColumns = "c_mktsegment" :: Nil),
+    Table("orders", partitionColumns = "o_orderdate" :: Nil),
+    Table("lineitem", partitionColumns = "l_shipdate" :: Nil),
+    Table("nation", partitionColumns = Nil),
+    Table("region", partitionColumns = Nil)
+  )
+
+  private def sessionBuilder() = {
+    SparkSession
+      .builder()
+      .master("local[1]")
+      .appName(this.getClass.getCanonicalName)
+      .config(SQLConf.SHUFFLE_PARTITIONS.key, 1)
+      .config(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, 1)
+      .config("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .config("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .config("spark.ui.enabled", "false")
+      .config("spark.gluten.ui.enabled", "false")
+      .config("spark.memory.offHeap.enabled", "true")
+      .config("spark.memory.offHeap.size", "2g")
+      .config("spark.sql.adaptive.enabled", "false")
+  }
+
+  private def createLegacySession(): SparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    sessionBuilder()
+      .config(GlutenConfig.RAS_ENABLED.key, false)
+      .getOrCreate()
+  }
+
+  private def createRasSession(): SparkSession = {
+    SparkSession.cleanupAnyExistingSession()
+    sessionBuilder()
+      .config(GlutenConfig.RAS_ENABLED.key, true)
+      .getOrCreate()
+  }
+
+  private def createTpchTables(spark: SparkSession): Unit = {
+    tpchTables
+      .map(_.name)
+      .map {
+        table =>
+          val tablePath = new File(dataDirPath, table).getAbsolutePath
+          val tableDF = spark.read.format("parquet").load(tablePath)
+          tableDF.createOrReplaceTempView(table)
+          (table, tableDF)
+      }
+      .toMap
+  }
+
+  private def tpchSQL(queryId: String): String =
+    Arm.withResource(Source.fromFile(new File(s"$tpchQueries/$queryId.sql"), 
"UTF-8"))(_.mkString)
+
+  private val allQueryIds: Seq[String] = Seq(
+    "q1",
+    "q2",
+    "q3",
+    "q4",
+    "q5",
+    "q6",
+    "q7",
+    "q8",
+    "q9",
+    "q10",
+    "q11",
+    "q12",
+    "q13",
+    "q14",
+    "q15",
+    "q16",
+    "q17",
+    "q18",
+    "q19",
+    "q20",
+    "q21",
+    "q22"
+  )
+
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    val benchmark = new Benchmark(
+      this.getClass.getCanonicalName,
+      allQueryIds.size,
+      output = output,
+      warmupTime = 15.seconds,
+      minTime = 60.seconds)
+    benchmark.addTimerCase("RAS Planner") {
+      timer =>
+        val spark = createRasSession()
+        createTpchTables(spark)
+        timer.startTiming()
+        allQueryIds.foreach {
+          id =>
+            val p = spark.sql(tpchSQL(id)).queryExecution.executedPlan
+            // scalastyle:off println
+            println("[RAS] Optimized query plan: " + p.toString())
+            // scalastyle:on println
+        }
+        timer.stopTiming()
+    }
+    benchmark.addTimerCase("Legacy Planner") {
+      timer =>
+        val spark = createLegacySession()
+        createTpchTables(spark)
+        timer.startTiming()
+        allQueryIds.foreach {
+          id =>
+            val p = spark.sql(tpchSQL(id)).queryExecution.executedPlan
+            // scalastyle:off println
+            println("[Legacy] Optimized query plan: " + p.toString())
+            // scalastyle:on println
+        }
+        timer.stopTiming()
+    }
+    benchmark.run()
+  }
+}
diff --git 
a/gluten-ras/common/src/main/scala/org/apache/gluten/ras/rule/RuleApplier.scala 
b/gluten-ras/common/src/main/scala/org/apache/gluten/ras/rule/RuleApplier.scala
index 3d94a99967..ed686c6ef7 100644
--- 
a/gluten-ras/common/src/main/scala/org/apache/gluten/ras/rule/RuleApplier.scala
+++ 
b/gluten-ras/common/src/main/scala/org/apache/gluten/ras/rule/RuleApplier.scala
@@ -31,14 +31,14 @@ trait RuleApplier[T <: AnyRef] {
 
 object RuleApplier {
   def apply[T <: AnyRef](ras: Ras[T], closure: Closure[T], rule: RasRule[T]): 
RuleApplier[T] = {
-    new ShapeAwareRuleApplier[T](ras, new RegularRuleApplier(ras, closure, 
rule))
+    new RegularRuleApplier(ras, closure, rule)
   }
 
   def apply[T <: AnyRef](
       ras: Ras[T],
       closure: Closure[T],
       rule: EnforcerRule[T]): RuleApplier[T] = {
-    new ShapeAwareRuleApplier[T](ras, new EnforcerRuleApplier[T](ras, closure, 
rule))
+    new EnforcerRuleApplier[T](ras, closure, rule)
   }
 
   private class RegularRuleApplier[T <: AnyRef](ras: Ras[T], closure: 
Closure[T], rule: RasRule[T])
@@ -46,6 +46,9 @@ object RuleApplier {
     private val deDup = mutable.Map[RasClusterKey, 
mutable.Set[UnsafeHashKey[T]]]()
 
     override def apply(icp: InClusterPath[T]): Unit = {
+      if (!shape.identify(icp.path())) {
+        return
+      }
       val cKey = icp.cluster()
       val path = icp.path()
       val plan = path.plan()
@@ -68,7 +71,7 @@ object RuleApplier {
       }
     }
 
-    override def shape(): Shape[T] = rule.shape()
+    override val shape: Shape[T] = rule.shape()
   }
 
   private class EnforcerRuleApplier[T <: AnyRef](
@@ -81,6 +84,9 @@ object RuleApplier {
     private val constraintDef = constraint.definition()
 
     override def apply(icp: InClusterPath[T]): Unit = {
+      if (!shape.identify(icp.path())) {
+        return
+      }
       val cKey = icp.cluster()
       val path = icp.path()
       val propSet = path.node().self().propSet()
@@ -108,20 +114,6 @@ object RuleApplier {
       }
     }
 
-    override def shape(): Shape[T] = rule.shape()
-  }
-
-  private class ShapeAwareRuleApplier[T <: AnyRef](ras: Ras[T], rule: 
RuleApplier[T])
-    extends RuleApplier[T] {
-    private val ruleShape = rule.shape()
-
-    override def apply(icp: InClusterPath[T]): Unit = {
-      if (!ruleShape.identify(icp.path())) {
-        return
-      }
-      rule.apply(icp)
-    }
-
-    override def shape(): Shape[T] = ruleShape
+    override val shape: Shape[T] = rule.shape()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to