This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0adf4aecbc [GLUTEN-7685][VL][RAS] Add new cost model to avoid costly
r2c (#7686)
0adf4aecbc is described below
commit 0adf4aecbcafbf0ad31157e2e62f35eb0b184ee7
Author: Mingliang Zhu <[email protected]>
AuthorDate: Tue Oct 29 15:07:07 2024 +0800
[GLUTEN-7685][VL][RAS] Add new cost model to avoid costly r2c (#7686)
Closes #7685
---
.../execution/VeloxRoughCostModel2Suite.scala | 65 ++++++++++++++++++
.../gluten/planner/cost/GlutenCostModel.scala | 5 +-
.../gluten/planner/cost/RoughCostModel2.scala | 78 ++++++++++++++++++++++
.../scala/org/apache/gluten/GlutenConfig.scala | 26 ++++++++
4 files changed, 173 insertions(+), 1 deletion(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala
new file mode 100644
index 0000000000..e29d3082a9
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxRoughCostModel2Suite.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.GlutenConfig
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.execution.ProjectExec
+
+class VeloxRoughCostModel2Suite extends VeloxWholeStageTransformerSuite {
+ override protected val resourcePath: String = "/tpch-data-parquet-velox"
+ override protected val fileFormat: String = "parquet"
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ spark
+ .range(100)
+ .selectExpr("cast(id % 3 as int) as c1", "id as c2", "array(id, id + 1)
as c3")
+ .write
+ .format("parquet")
+ .saveAsTable("tmp1")
+ }
+
+ override protected def afterAll(): Unit = {
+ spark.sql("drop table tmp1")
+ super.afterAll()
+ }
+
+ override protected def sparkConf: SparkConf = super.sparkConf
+ .set(GlutenConfig.RAS_ENABLED.key, "true")
+ .set(GlutenConfig.RAS_COST_MODEL.key, "rough2")
+ .set(GlutenConfig.VANILLA_VECTORIZED_READERS_ENABLED.key, "false")
+
+ test("fallback trivial project if its neighbor nodes fell back") {
+ withSQLConf(GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false") {
+ runQueryAndCompare("select c1 as c3 from tmp1") {
+ checkSparkOperatorMatch[ProjectExec]
+ }
+ }
+ }
+
+ test("avoid adding r2c if r2c cost greater than native") {
+ withSQLConf(
+ GlutenConfig.COLUMNAR_FILESCAN_ENABLED.key -> "false",
+ GlutenConfig.RAS_ROUGH2_SIZEBYTES_THRESHOLD.key -> "1") {
+ runQueryAndCompare("select array_contains(c3, 0) as list from tmp1") {
+ checkSparkOperatorMatch[ProjectExec]
+ }
+ }
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
index c4d7a42a26..0a58b9f69c 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/GlutenCostModel.scala
@@ -26,7 +26,10 @@ import org.apache.spark.sql.utils.ReflectionUtil
object GlutenCostModel extends Logging {
def find(): CostModel[SparkPlan] = {
val aliases: Map[String, Class[_ <: CostModel[SparkPlan]]] =
- Map("legacy" -> classOf[LegacyCostModel], "rough" ->
classOf[RoughCostModel])
+ Map(
+ "legacy" -> classOf[LegacyCostModel],
+ "rough" -> classOf[RoughCostModel],
+ "rough2" -> classOf[RoughCostModel2])
val aliasOrClass = GlutenConfig.getConf.rasCostModel
val clazz: Class[_ <: CostModel[SparkPlan]] = if
(aliases.contains(aliasOrClass)) {
aliases(aliasOrClass)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel2.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel2.scala
new file mode 100644
index 0000000000..4452e77798
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/planner/cost/RoughCostModel2.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.planner.cost
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.extension.columnar.transition.{ColumnarToRowLike,
RowToColumnarLike}
+import org.apache.gluten.utils.PlanUtil
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute,
NamedExpression}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase
+
+/** Compared with rough, rough2 can be more precise to avoid the costly r2c. */
+class RoughCostModel2 extends LongCostModel {
+
+ private def getSizeFactor(plan: SparkPlan): Long = {
+ // Get the bytes size that the plan needs to consume.
+ val sizeBytes = plan match {
+ case _: DataSourceScanExec | _: DataSourceV2ScanExecBase =>
getStatSizeBytes(plan)
+ case _: LeafExecNode => 0L
+ case p => p.children.map(getStatSizeBytes).sum
+ }
+ sizeBytes / GlutenConfig.getConf.rasRough2SizeBytesThreshold
+ }
+
+ private def getStatSizeBytes(plan: SparkPlan): Long = {
+ plan match {
+ case a: AdaptiveSparkPlanExec => getStatSizeBytes(a.inputPlan)
+ case _ =>
+ plan.logicalLink match {
+ case Some(logicalPlan) => logicalPlan.stats.sizeInBytes.toLong
+ case _ => plan.children.map(getStatSizeBytes).sum
+ }
+ }
+ }
+
+ override def selfLongCostOf(node: SparkPlan): Long = {
+ val sizeFactor = getSizeFactor(node)
+ val opCost = node match {
+ case ProjectExec(projectList, _) if
projectList.forall(isCheapExpression) =>
+ // Make trivial ProjectExec has the same cost as ProjectExecTransform
to reduce unnecessary
+ // c2r and r2c.
+ 1L
+ case ColumnarToRowExec(_) => 1L
+ case RowToColumnarExec(_) => 1L
+ case ColumnarToRowLike(_) => 1L
+ case RowToColumnarLike(_) =>
+ // If sizeBytes is less than the threshold, the cost of
RowToColumnarLike is ignored.
+ if (sizeFactor == 0) 1L else GlutenConfig.getConf.rasRough2R2cCost
+ case p if PlanUtil.isGlutenColumnarOp(p) => 1L
+ case p if PlanUtil.isVanillaColumnarOp(p) =>
GlutenConfig.getConf.rasRough2VanillaCost
+ // Other row ops. Usually a vanilla row op.
+ case _ => GlutenConfig.getConf.rasRough2VanillaCost
+ }
+ opCost * Math.max(1, sizeFactor)
+ }
+
+ private def isCheapExpression(ne: NamedExpression): Boolean = ne match {
+ case Alias(_: Attribute, _) => true
+ case _: Attribute => true
+ case _ => false
+ }
+}
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index dfe7e4a082..789d57414d 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -270,6 +270,12 @@ class GlutenConfig(conf: SQLConf) extends Logging {
def rasCostModel: String = conf.getConf(RAS_COST_MODEL)
+ def rasRough2SizeBytesThreshold: Long =
conf.getConf(RAS_ROUGH2_SIZEBYTES_THRESHOLD)
+
+ def rasRough2R2cCost: Long = conf.getConf(RAS_ROUGH2_R2C_COST)
+
+ def rasRough2VanillaCost: Long = conf.getConf(RAS_ROUGH2_VANILLA_COST)
+
def enableVeloxCache: Boolean = conf.getConf(COLUMNAR_VELOX_CACHE_ENABLED)
def veloxMemCacheSize: Long = conf.getConf(COLUMNAR_VELOX_MEM_CACHE_SIZE)
@@ -1366,6 +1372,26 @@ object GlutenConfig {
.stringConf
.createWithDefaultString("legacy")
+ val RAS_ROUGH2_SIZEBYTES_THRESHOLD =
+ buildConf("spark.gluten.ras.rough2.sizeBytesThreshold")
+ .doc(
+ "Experimental: Threshold of the byte size consumed by sparkPlan,
coefficient used " +
+ "to calculate cost in RAS rough2 model")
+ .longConf
+ .createWithDefault(1073741824L)
+
+ val RAS_ROUGH2_R2C_COST =
+ buildConf("spark.gluten.ras.rough2.r2c.cost")
+ .doc("Experimental: Cost of RowToVeloxColumnarExec in RAS rough2 model")
+ .longConf
+ .createWithDefault(100L)
+
+ val RAS_ROUGH2_VANILLA_COST =
+ buildConf("spark.gluten.ras.rough2.vanilla.cost")
+ .doc("Experimental: Cost of vanilla spark operater in RAS rough model")
+ .longConf
+ .createWithDefault(20L)
+
// velox caching options.
val COLUMNAR_VELOX_CACHE_ENABLED =
buildStaticConf("spark.gluten.sql.columnar.backend.velox.cacheEnabled")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]