This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0adf4aecbc [GLUTEN-7685][VL][RAS] Add new cost model to avoid costly 
r2c (#7686)
0adf4aecbc is described below

commit 0adf4aecbcafbf0ad31157e2e62f35eb0b184ee7
Author: Mingliang Zhu <[email protected]>
AuthorDate: Tue Oct 29 15:07:07 2024 +0800

    [GLUTEN-7685][VL][RAS] Add new cost model to avoid costly r2c (#7686)
    
    Closes #7685
---
 .../execution/VeloxRoughCostModel2Suite.scala      | 65 ++++++++++++++++++
 .../gluten/planner/cost/GlutenCostModel.scala      |  5 +-
 .../gluten/planner/cost/RoughCostModel2.scala      | 78 ++++++++++++++++++++++
 .../scala/org/apache/gluten/GlutenConfig.scala     | 26 ++++++++
 4 files changed, 173 insertions(+), 1 deletion(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala
new file mode 100644
index 0000000000..e29d3082a9
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.ProjectExec
+
+class VeloxRoughCostModel2Suite extends VeloxWholeStageTransformerSuite {
+  override protected val resourcePath: String = "/tpch-data-parquet-velox"
+  override protected val fileFormat: String = "parquet"
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    spark
+      .range(100)
+      .selectExpr("cast(id % 3 as int) as c1", "id as c2", "array(id, id + 1) 
as c3")
+      .write
+      .format("parquet")
+      .saveAsTable("tmp1")
+  }
+
+  override protected def afterAll(): Unit = {
+    spark.sql("drop table tmp1")
+    super.afterAll()
+  }
+
+  override protected def sparkConf: SparkConf = super.sparkConf
+    .set(GlutenConfig.RAS_ENABLED.key, "true")
+    .set(GlutenConfig.RAS_COST_MODEL.key, "rough2")
+    .set(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key, "false")
+
+  test("fallback trivial project if its neighbor nodes fell back") {
+    withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
+      runQueryAndCompare("select c1 as c3 from tmp1") {
+        checkSparkOperatorMatch[ProjectExec]
+      }
+    }
+  }
+
+  test("avoid adding r2c if r2c cost greater than native") {
+    withSQLConf(
+      GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false",
+      GlutenConfig.RAS_ROUGH2_SIZEBYTES_THRESHOLD.key -> "1") {
+      runQueryAndCompare("select array_contains(c3, 0) as list from tmp1") {
+        checkSparkOperatorMatch[ProjectExec]
+      }
+    }
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
index c4d7a42a26..0a58b9f69c 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
@@ -26,7 +26,10 @@ import org.apache.spark.sql.utils.ReflectionUtil
 object GlutenCostModel extends Logging {
   def find(): CostModel[SparkPlan] = {
     val aliases: Map[String, Class[_ <: CostModel[SparkPlan]]] =
-      Map("legacy" -> classOf[LegacyCostModel], "rough" -> 
classOf[RoughCostModel])
+      Map(
+        "legacy" -> classOf[LegacyCostModel],
+        "rough" -> classOf[RoughCostModel],
+        "rough2" -> classOf[RoughCostModel2])
     val aliasOrClass = GlutenConfig.getConf.rasCostModel
     val clazz: Class[_ <: CostModel[SparkPlan]] = if 
(aliases.contains(aliasOrClass)) {
       aliases(aliasOrClass)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel2.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel2.scala
new file mode 100644
index 0000000000..4452e77798
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel2.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.planner.cost
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike, 
RowToColumnarLike}
+import org.apache.gluten.utils.PlanUtil
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, 
NamedExpression}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
+
+/** Compared with rough, rough2 can be more precise to avoid the costly r2c. */
+class RoughCostModel2 extends LongCostModel {
+
+  private def getSizeFactor(plan: SparkPlan): Long = {
+    // Get the bytes size that the plan needs to consume.
+    val sizeBytes = plan match {
+      case _: DataSourceScanExec | _: DataSourceV2ScanExecBase => 
getStatSizeBytes(plan)
+      case _: LeafExecNode => 0L
+      case p => p.children.map(getStatSizeBytes).sum
+    }
+    sizeBytes / GlutenConfig.getConf.rasRough2SizeBytesThreshold
+  }
+
+  private def getStatSizeBytes(plan: SparkPlan): Long = {
+    plan match {
+      case a: AdaptiveSparkPlanExec => getStatSizeBytes(a.inputPlan)
+      case _ =>
+        plan.logicalLink match {
+          case Some(logicalPlan) => logicalPlan.stats.sizeInBytes.toLong
+          case _ => plan.children.map(getStatSizeBytes).sum
+        }
+    }
+  }
+
+  override def selfLongCostOf(node: SparkPlan): Long = {
+    val sizeFactor = getSizeFactor(node)
+    val opCost = node match {
+      case ProjectExec(projectList, _) if 
projectList.forall(isCheapExpression) =>
+        // Make trivial ProjectExec has the same cost as ProjectExecTransform 
to reduce unnecessary
+        // c2r and r2c.
+        1L
+      case ColumnarToRowExec(_) => 1L
+      case RowToColumnarExec(_) => 1L
+      case ColumnarToRowLike(_) => 1L
+      case RowToColumnarLike(_) =>
+        // If sizeBytes is less than the threshold, the cost of 
RowToColumnarLike is ignored.
+        if (sizeFactor == 0) 1L else GlutenConfig.getConf.rasRough2R2cCost
+      case p if PlanUtil.isGlutenColumnarOp(p) => 1L
+      case p if PlanUtil.isVanillaColumnarOp(p) => 
GlutenConfig.getConf.rasRough2VanillaCost
+      // Other row ops. Usually a vanilla row op.
+      case _ => GlutenConfig.getConf.rasRough2VanillaCost
+    }
+    opCost * Math.max(1, sizeFactor)
+  }
+
+  private def isCheapExpression(ne: NamedExpression): Boolean = ne match {
+    case Alias(_: Attribute, _) => true
+    case _: Attribute => true
+    case _ => false
+  }
+}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala 
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index dfe7e4a082..789d57414d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -270,6 +270,12 @@ class GlutenConfig(conf: SQLConf) extends Logging {
 
   def rasCostModel: String = conf.getConf(RAS_COST_MODEL)
 
+  def rasRough2SizeBytesThreshold: Long = 
conf.getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD)
+
+  def rasRough2R2cCost: Long = conf.getConf(RAS_ROUGH2_R2C_COST)
+
+  def rasRough2VanillaCost: Long = conf.getConf(RAS_ROUGH2_VANILLA_COST)
+
   def enableVeloxCache: Boolean = conf.getConf(COLUMNAR_VELOX_CACHE_ENABLED)
 
   def veloxMemCacheSize: Long = conf.getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE)
@@ -1366,6 +1372,26 @@ object GlutenConfig {
       .stringConf
       .createWithDefaultString("legacy")
 
+  val RAS_ROUGH2_SIZEBYTES_THRESHOLD =
+    buildConf("spark.gluten.ras.rough2.sizeBytesThreshold")
+      .doc(
+        "Experimental: Threshold of the byte size consumed by sparkPlan, 
coefficient used " +
+          "to calculate cost in RAS rough2 model")
+      .longConf
+      .createWithDefault(1073741824L)
+
+  val RAS_ROUGH2_R2C_COST =
+    buildConf("spark.gluten.ras.rough2.r2c.cost")
+      .doc("Experimental: Cost of RowToVeloxColumnarExec in RAS rough2 model")
+      .longConf
+      .createWithDefault(100L)
+
+  val RAS_ROUGH2_VANILLA_COST =
+    buildConf("spark.gluten.ras.rough2.vanilla.cost")
+      .doc("Experimental: Cost of vanilla spark operater in RAS rough model")
+      .longConf
+      .createWithDefault(20L)
+
   // velox caching options.
   val COLUMNAR_VELOX_CACHE_ENABLED =
     buildStaticConf("spark.gluten.sql.columnar.backend.velox.cacheEnabled")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to