This is an automated email from the ASF dual-hosted git repository.

mingliang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 0a2bda218a [CORE] AQE: Move `GlutenCostEvaluator` from shim layer to 
`gluten-core` (#9882)
0a2bda218a is described below

commit 0a2bda218a0cfbfe42aeeb6e4e538e691afc5fca
Author: Hongze Zhang <[email protected]>
AuthorDate: Thu Jun 5 15:45:49 2025 +0100

    [CORE] AQE: Move `GlutenCostEvaluator` from shim layer to `gluten-core` 
(#9882)
---
 .../scala/org/apache/gluten/GlutenPlugin.scala     |  6 ++--
 .../spark/sql/execution/adaptive/GlutenCost.scala  |  1 +
 .../execution/adaptive/GlutenCostEvaluator.scala   | 30 +++++++++++++++----
 .../execution/adaptive/GlutenCostEvaluator.scala   | 32 --------------------
 .../execution/adaptive/GlutenCostEvaluator.scala   | 35 ----------------------
 .../execution/adaptive/GlutenCostEvaluator.scala   | 35 ----------------------
 6 files changed, 30 insertions(+), 109 deletions(-)

diff --git a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala 
b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
index ff5ae59eef..611e4e398c 100644
--- a/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
+++ b/gluten-core/src/main/scala/org/apache/gluten/GlutenPlugin.scala
@@ -31,6 +31,7 @@ import org.apache.spark.api.plugin.{DriverPlugin, 
ExecutorPlugin, PluginContext,
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.softaffinity.SoftAffinityListener
+import org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator
 import org.apache.spark.sql.execution.ui.{GlutenSQLAppStatusListener, 
GlutenUIUtils}
 import org.apache.spark.sql.internal.{SparkConfigUtil, SQLConf}
 import org.apache.spark.sql.internal.StaticSQLConf.SPARK_SESSION_EXTENSIONS
@@ -181,8 +182,9 @@ private[gluten] class GlutenDriverPlugin extends 
DriverPlugin with Logging {
       GlutenConfig.COST_EVALUATOR_ENABLED.key,
       GlutenConfig.COST_EVALUATOR_ENABLED.defaultValue.get)
     if (enableGlutenCostEvaluator) {
-      val costEvaluator = 
"org.apache.spark.sql.execution.adaptive.GlutenCostEvaluator"
-      conf.set(SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key, costEvaluator)
+      conf.set(
+        SQLConf.ADAPTIVE_CUSTOM_COST_EVALUATOR_CLASS.key,
+        classOf[GlutenCostEvaluator].getName)
     }
 
     // check memory off-heap enabled and size.
diff --git 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCost.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCost.scala
similarity index 96%
rename from 
shims/common/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCost.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCost.scala
index 9436f469ba..df0b6fafe9 100644
--- 
a/shims/common/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCost.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCost.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.adaptive
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.execution.SparkPlan
 
+/** Since https://github.com/apache/incubator-gluten/pull/6143. */
 class GlutenCost(val eval: CostEvaluator, val plan: SparkPlan) extends Cost {
   override def compare(that: Cost): Int = that match {
     case that: GlutenCost if plan eq that.plan =>
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
similarity index 52%
rename from 
shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
rename to 
gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
index 2feb660317..0d006a4848 100644
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
@@ -20,16 +20,36 @@ import org.apache.gluten.config.GlutenConfig
 
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.util.{SparkVersionUtil, Utils}
 
-/** This [[CostEvaluator]] is to force use the new physical plan when cost is 
equal. */
+/**
+ * This [[CostEvaluator]] is to force use the new physical plan when cost is 
equal.
+ *
+ * Since https://github.com/apache/incubator-gluten/pull/6143.
+ */
 case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
+  private val ltSpark33: Boolean = {
+    
SparkVersionUtil.compareMajorMinorVersion(SparkVersionUtil.majorMinorVersion(), 
(3, 3)) < 0
+  }
+
+  private val vanillaCostEvaluator: CostEvaluator = {
+    if (ltSpark33) {
+      val clazz = 
Utils.classForName("org.apache.spark.sql.execution.adaptive.SimpleCostEvaluator$")
+      clazz.getDeclaredField("MODULE$").get(null).asInstanceOf[CostEvaluator]
+    } else {
+      val forceOptimizeSkewedJoin =
+        
conf.getConfString("spark.sql.adaptive.forceOptimizeSkewedJoin").toBoolean
+      val clazz = 
Utils.classForName("org.apache.spark.sql.execution.adaptive.SimpleCostEvaluator")
+      val ctor = clazz.getConstructor(classOf[Boolean])
+      
ctor.newInstance(forceOptimizeSkewedJoin.asInstanceOf[Object]).asInstanceOf[CostEvaluator]
+    }
+  }
+
   override def evaluateCost(plan: SparkPlan): Cost = {
-    val forceOptimizeSkewedJoin = 
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
     if (GlutenConfig.get.enableGluten) {
-      new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
+      new GlutenCost(vanillaCostEvaluator, plan)
     } else {
-      SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
+      vanillaCostEvaluator.evaluateCost(plan)
     }
   }
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
 
b/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
deleted file mode 100644
index ab2c1b2819..0000000000
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.adaptive
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.execution.SparkPlan
-
-/** This [[CostEvaluator]] is to force use the new physical plan when cost is 
equal. */
-case class GlutenCostEvaluator() extends CostEvaluator {
-  override def evaluateCost(plan: SparkPlan): Cost = {
-    if (GlutenConfig.get.enableGluten) {
-      new GlutenCost(SimpleCostEvaluator, plan)
-    } else {
-      SimpleCostEvaluator.evaluateCost(plan)
-    }
-  }
-}
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
deleted file mode 100644
index 2feb660317..0000000000
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.adaptive
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.internal.SQLConf
-
-/** This [[CostEvaluator]] is to force use the new physical plan when cost is 
equal. */
-case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
-  override def evaluateCost(plan: SparkPlan): Cost = {
-    val forceOptimizeSkewedJoin = 
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
-    if (GlutenConfig.get.enableGluten) {
-      new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
-    } else {
-      SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
-    }
-  }
-}
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
deleted file mode 100644
index 2feb660317..0000000000
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/adaptive/GlutenCostEvaluator.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.adaptive
-
-import org.apache.gluten.config.GlutenConfig
-
-import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.internal.SQLConf
-
-/** This [[CostEvaluator]] is to force use the new physical plan when cost is 
equal. */
-case class GlutenCostEvaluator() extends CostEvaluator with SQLConfHelper {
-  override def evaluateCost(plan: SparkPlan): Cost = {
-    val forceOptimizeSkewedJoin = 
conf.getConf(SQLConf.ADAPTIVE_FORCE_OPTIMIZE_SKEWED_JOIN)
-    if (GlutenConfig.get.enableGluten) {
-      new GlutenCost(SimpleCostEvaluator(forceOptimizeSkewedJoin), plan)
-    } else {
-      SimpleCostEvaluator(forceOptimizeSkewedJoin).evaluateCost(plan)
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to