This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new dc4bc629a [VL] RAS: Add EnumeratedApplier to manage columnar rule
applications when ras is enabled (#5276)
dc4bc629a is described below
commit dc4bc629aaadbdbd285f6d0654643733a8542607
Author: Hongze Zhang <[email protected]>
AuthorDate: Sun Apr 7 12:06:02 2024 +0800
[VL] RAS: Add EnumeratedApplier to manage columnar rule applications when
ras is enabled (#5276)
---
.../org/apache/gluten/planner/VeloxRasSuite.scala | 2 +-
.../gluten/extension/ColumnarOverrides.scala | 265 +--------------------
.../columnar/ColumnarRuleApplier.scala} | 10 +-
.../extension/columnar/ColumnarTransitions.scala | 38 ++-
.../{transform => }/ImplementSingleNode.scala | 3 +-
.../extension/columnar/MiscColumnarRules.scala | 3 +-
.../columnar/RewriteSparkPlanRulesManager.scala | 18 +-
.../columnar/enumerated/EnumeratedApplier.scala | 173 ++++++++++++++
.../{ => enumerated}/EnumeratedTransform.scala | 4 +-
.../columnar/heuristic/HeuristicApplier.scala | 173 ++++++++++++++
.../extension/columnar/util/AdaptiveContext.scala | 93 ++++++++
.../apache/gluten/planner/GlutenOptimization.scala | 11 -
.../GlutenFormatWriterInjectsBase.scala | 5 +-
.../sql/execution/FallbackStrategiesSuite.scala | 13 +-
.../sql/execution/FallbackStrategiesSuite.scala | 13 +-
.../sql/execution/FallbackStrategiesSuite.scala | 13 +-
16 files changed, 534 insertions(+), 303 deletions(-)
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
index 4b1ee935f..cf600ff56 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
@@ -95,7 +95,7 @@ class VeloxRasSuite extends SharedSparkSession {
object VeloxRasSuite {
def newRas(): Ras[SparkPlan] = {
- GlutenOptimization().asInstanceOf[Ras[SparkPlan]]
+ GlutenOptimization(List()).asInstanceOf[Ras[SparkPlan]]
}
def newRas(RasRules: Seq[RasRule[SparkPlan]]): Ras[SparkPlan] = {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
index b664aa988..8ba51342a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
@@ -17,12 +17,10 @@
package org.apache.gluten.extension
import org.apache.gluten.{GlutenConfig, GlutenSparkExtensionsInjector}
-import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.columnar._
-import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
-import org.apache.gluten.extension.columnar.transform._
-import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector, PlanUtil}
+import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.utils.LogLevelUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
@@ -30,68 +28,11 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive._
import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SparkRuleUtil
-
-import scala.collection.mutable.ListBuffer
-
-private[extension] object ColumnarToRowLike {
- def unapply(plan: SparkPlan): Option[SparkPlan] = {
- plan match {
- case c2r: ColumnarToRowTransition =>
- Some(c2r.child)
- case _ => None
- }
- }
-}
-// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
-// to support vanilla columnar operators.
-case class InsertColumnarToColumnarTransitions(session: SparkSession) extends
Rule[SparkPlan] {
- @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
- private def replaceWithVanillaColumnarToRow(p: SparkPlan): SparkPlan =
p.transformUp {
- case plan if PlanUtil.isGlutenColumnarOp(plan) =>
- plan.withNewChildren(plan.children.map {
- case child if PlanUtil.isVanillaColumnarOp(child) =>
- BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
- ColumnarToRowExec(child))
- case other => other
- })
- }
-
- private def replaceWithVanillaRowToColumnar(p: SparkPlan): SparkPlan =
p.transformUp {
- case plan if PlanUtil.isVanillaColumnarOp(plan) =>
- plan.withNewChildren(plan.children.map {
- case child if PlanUtil.isGlutenColumnarOp(child) =>
- RowToColumnarExec(
-
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
- case other => other
- })
- }
-
- def apply(plan: SparkPlan): SparkPlan = {
- val newPlan =
replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
- planChangeLogger.logRule(ruleName, plan, newPlan)
- newPlan
- }
-}
object ColumnarOverrideRules {
- val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"
-
- def rewriteSparkPlanRule(): Rule[SparkPlan] = {
- val rewriteRules = Seq(
- RewriteIn,
- RewriteMultiChildrenCount,
- RewriteCollect,
- RewriteTypedImperativeAggregate,
- PullOutPreProject,
- PullOutPostProject)
- new RewriteSparkPlanRulesManager(rewriteRules)
- }
// Utilities to infer columnar rule's caller's property:
// ApplyColumnarRulesAndInsertTransitions#outputsColumnar.
@@ -160,158 +101,6 @@ case class ColumnarOverrideRules(session: SparkSession)
import ColumnarOverrideRules._
- private lazy val transformPlanLogLevel =
GlutenConfig.getConf.transformPlanLogLevel
- @transient private lazy val planChangeLogger = new
PlanChangeLogger[SparkPlan]()
-
- // This is an empirical value, may need to be changed for supporting other
versions of spark.
- private val aqeStackTraceIndex = 18
-
- // Holds the original plan for possible entire fallback.
- private val localOriginalPlans: ThreadLocal[ListBuffer[SparkPlan]] =
- ThreadLocal.withInitial(() => ListBuffer.empty[SparkPlan])
- private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
- ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])
-
- // Do not create rules in class initialization as we should access SQLConf
- // while creating the rules. At this time SQLConf may not be there yet.
-
- // Just for test use.
- def enableAdaptiveContext(): ColumnarOverrideRules = {
- session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
- this
- }
-
- private def isAdaptiveContext: Boolean =
- Option(session.sparkContext.getLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT))
- .getOrElse("false")
- .toBoolean ||
- localIsAdaptiveContextFlags.get().head
-
- private def setAdaptiveContext(): Unit = {
- val traceElements = Thread.currentThread.getStackTrace
- assert(
- traceElements.length > aqeStackTraceIndex,
- s"The number of stack trace elements is expected to be more than
$aqeStackTraceIndex")
- // ApplyColumnarRulesAndInsertTransitions is called by either
QueryExecution or
- // AdaptiveSparkPlanExec. So by checking the stack trace, we can know
whether
- // columnar rule will be applied in adaptive execution context. This part
of code
- // needs to be carefully checked when supporting higher versions of spark
to make
- // sure the calling stack has not been changed.
- localIsAdaptiveContextFlags
- .get()
- .prepend(
- traceElements(aqeStackTraceIndex).getClassName
- .equals(AdaptiveSparkPlanExec.getClass.getName))
- }
-
- private def resetAdaptiveContext(): Unit =
- localIsAdaptiveContextFlags.get().remove(0)
-
- private def setOriginalPlan(plan: SparkPlan): Unit = {
- localOriginalPlans.get.prepend(plan)
- }
-
- private def originalPlan: SparkPlan = {
- val plan = localOriginalPlans.get.head
- assert(plan != null)
- plan
- }
-
- private def resetOriginalPlan(): Unit = localOriginalPlans.get.remove(0)
-
- /**
- * Rules to let planner create a suggested Gluten plan being sent to
`fallbackPolicies` in which
- * the plan will be breakdown and decided to be fallen back or not.
- */
- private def transformRules(outputsColumnar: Boolean): List[SparkSession =>
Rule[SparkPlan]] = {
-
- def maybeRas(outputsColumnar: Boolean): List[SparkSession =>
Rule[SparkPlan]] = {
- if (GlutenConfig.getConf.enableRas) {
- return List(
- (_: SparkSession) => TransformPreOverrides(List(ImplementFilter()),
List.empty),
- (session: SparkSession) => EnumeratedTransform(session,
outputsColumnar),
- (_: SparkSession) => RemoveTransitions,
- (_: SparkSession) => TransformPreOverrides(List.empty,
List(ImplementAggregate()))
- )
- }
- List((_: SparkSession) => TransformPreOverrides())
- }
-
- List(
- (_: SparkSession) => RemoveTransitions,
- (spark: SparkSession) => FallbackOnANSIMode(spark),
- (spark: SparkSession) => FallbackMultiCodegens(spark),
- (spark: SparkSession) => PlanOneRowRelation(spark),
- (_: SparkSession) => FallbackEmptySchemaRelation()
- ) :::
-
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
:::
- List(
- (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark),
- (_: SparkSession) => rewriteSparkPlanRule(),
- (_: SparkSession) => AddTransformHintRule(),
- (_: SparkSession) => FallbackBloomFilterAggIfNeeded()
- ) :::
- maybeRas(outputsColumnar) :::
- List(
- (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
- (spark: SparkSession) => RewriteTransformer(spark),
- (_: SparkSession) => EnsureLocalSortRequirements,
- (_: SparkSession) => CollapseProjectExecTransformer
- ) :::
-
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
:::
- SparkRuleUtil
- .extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarTransformRules) :::
- List((_: SparkSession) => InsertTransitions(outputsColumnar))
- }
-
- /**
- * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints
to make planner fall
- * back the whole input plan to the original vanilla Spark plan.
- */
- private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
- List((_: SparkSession) => ExpandFallbackPolicy(isAdaptiveContext,
originalPlan))
- }
-
- /**
- * Rules applying to non-fallen-back Gluten plans. To do some post cleanup
works on the plan to
- * make sure it be able to run and be compatible with Spark's execution
engine.
- */
- private def postRules(): List[SparkSession => Rule[SparkPlan]] =
- List(
- (_: SparkSession) => TransformPostOverrides(),
- (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
- (s: SparkSession) => RemoveTopmostColumnarToRow(s, isAdaptiveContext)
- ) :::
-
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
- List((_: SparkSession) =>
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
- SparkRuleUtil.extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarPostRules)
-
- /*
- * Rules consistently applying to all input plans after all other rules have
been applied, despite
- * whether the input plan is fallen back or not.
- */
- private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
- List(
- // The rule is required despite whether the stage is fallen back or not.
Since
- // ColumnarCachedBatchSerializer is statically registered to Spark
without a columnar rule
- // when columnar table cache is enabled.
- (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s),
- (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s),
- (_: SparkSession) => RemoveTransformHintRule()
- )
- }
-
- private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
- setAdaptiveContext()
- setOriginalPlan(plan)
- try {
- f(plan)
- } finally {
- resetOriginalPlan()
- resetAdaptiveContext()
- }
- }
-
/**
* Note: Do not implement this API. We basically inject all of Gluten's
physical rules through
* `postColumnarTransitions`.
@@ -327,48 +116,14 @@ case class ColumnarOverrideRules(session: SparkSession)
val outputsColumnar = OutputsColumnarTester.inferOutputsColumnar(plan)
val unwrapped = OutputsColumnarTester.unwrap(plan)
val vanillaPlan = ColumnarTransitions.insertTransitions(unwrapped,
outputsColumnar)
- withTransformRules(transformRules(outputsColumnar)).apply(vanillaPlan)
+ val applier: ColumnarRuleApplier = if (GlutenConfig.getConf.enableRas) {
+ new EnumeratedApplier(session)
+ } else {
+ new HeuristicApplier(session)
+ }
+ applier.apply(vanillaPlan, outputsColumnar)
}
- // Visible for testing.
- def withTransformRules(transformRules: List[SparkSession =>
Rule[SparkPlan]]): Rule[SparkPlan] =
- plan =>
- PhysicalPlanSelector.maybe(session, plan) {
- val finalPlan = prepareFallback(plan) {
- p =>
- val suggestedPlan = transformPlan(transformRules, p, "transform")
- transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match
{
- case FallbackNode(fallbackPlan) =>
- // we should use vanilla c2r rather than native c2r,
- // and there should be no `GlutenPlan` any more,
- // so skip the `postRules()`.
- fallbackPlan
- case plan =>
- transformPlan(postRules(), plan, "post")
- }
- }
- transformPlan(finalRules(), finalPlan, "final")
- }
-
- private def transformPlan(
- getRules: List[SparkSession => Rule[SparkPlan]],
- plan: SparkPlan,
- step: String) = GlutenTimeMetric.withMillisTime {
- logOnLevel(
- transformPlanLogLevel,
- s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}")
- val overridden = getRules.foldLeft(plan) {
- (p, getRule) =>
- val rule = getRule(session)
- val newPlan = rule(p)
- planChangeLogger.logRule(rule.ruleName, p, newPlan)
- newPlan
- }
- logOnLevel(
- transformPlanLogLevel,
- s"${step}ColumnarTransitions afterOverriden
plan:\n${overridden.toString}")
- overridden
- }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took:
$t ms."))
}
object ColumnarOverrides extends GlutenSparkExtensionsInjector {
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/planner/rule/GlutenRules.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
similarity index 83%
rename from
gluten-core/src/main/scala/org/apache/gluten/planner/rule/GlutenRules.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 59250c563..17bf01730 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/planner/rule/GlutenRules.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -14,14 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.planner.rule
-
-import org.apache.gluten.ras.rule.RasRule
+package org.apache.gluten.extension.columnar
import org.apache.spark.sql.execution.SparkPlan
-object GlutenRules {
- def apply(): Seq[RasRule[SparkPlan]] = {
- List() // TODO
- }
+trait ColumnarRuleApplier {
+ def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
index b6a1bd2c4..5dd266433 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
@@ -16,7 +16,11 @@
*/
package org.apache.gluten.extension.columnar
-import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.utils.PlanUtil
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
import org.apache.spark.sql.execution.{ApplyColumnarRulesAndInsertTransitions,
ColumnarToRowExec, ColumnarToRowTransition, RowToColumnarExec,
RowToColumnarTransition, SparkPlan}
/** See rule code from vanilla Spark:
[[ApplyColumnarRulesAndInsertTransitions]]. */
@@ -44,6 +48,38 @@ object RemoveTransitions extends Rule[SparkPlan] {
}
}
+// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
+// to support vanilla columnar operators.
+case class InsertColumnarToColumnarTransitions(session: SparkSession) extends
Rule[SparkPlan] {
+ @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
+
+ private def replaceWithVanillaColumnarToRow(p: SparkPlan): SparkPlan =
p.transformUp {
+ case plan if PlanUtil.isGlutenColumnarOp(plan) =>
+ plan.withNewChildren(plan.children.map {
+ case child if PlanUtil.isVanillaColumnarOp(child) =>
+ BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
+ ColumnarToRowExec(child))
+ case other => other
+ })
+ }
+
+ private def replaceWithVanillaRowToColumnar(p: SparkPlan): SparkPlan =
p.transformUp {
+ case plan if PlanUtil.isVanillaColumnarOp(plan) =>
+ plan.withNewChildren(plan.children.map {
+ case child if PlanUtil.isGlutenColumnarOp(child) =>
+ RowToColumnarExec(
+
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
+ case other => other
+ })
+ }
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ val newPlan =
replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
+ planChangeLogger.logRule(ruleName, plan, newPlan)
+ newPlan
+ }
+}
+
object ColumnarTransitions {
def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
= {
InsertTransitions(outputsColumnar).apply(plan)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transform/ImplementSingleNode.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ImplementSingleNode.scala
similarity index 99%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transform/ImplementSingleNode.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ImplementSingleNode.scala
index 5820de270..3d0669de1 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transform/ImplementSingleNode.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ImplementSingleNode.scala
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension.columnar.transform
+package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.BackendsApiManager
@@ -22,7 +22,6 @@ import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.TransformHints
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index aa3c54144..0fd0856f2 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -17,8 +17,7 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.ColumnarToRowLike
-import org.apache.gluten.extension.columnar.transform.{ImplementAggregate,
ImplementExchange, ImplementFilter, ImplementJoin, ImplementOthers,
ImplementSingleNode}
+import
org.apache.gluten.extension.columnar.ColumnarTransitions.ColumnarToRowLike
import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
import org.apache.spark.sql.SparkSession
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
index 7de39d5fd..e694c23a7 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
@@ -16,6 +16,8 @@
*/
package org.apache.gluten.extension.columnar
+import org.apache.gluten.extension.{RewriteCollect, RewriteIn}
+
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
@@ -41,7 +43,8 @@ case class RewrittenNodeWall(originalChild: SparkPlan)
extends LeafExecNode {
*
* Note that, this rule does not touch and tag these operators who does not
need to rewrite.
*/
-class RewriteSparkPlanRulesManager(rewriteRules: Seq[Rule[SparkPlan]]) extends
Rule[SparkPlan] {
+class RewriteSparkPlanRulesManager private (rewriteRules: Seq[Rule[SparkPlan]])
+ extends Rule[SparkPlan] {
private def mayNeedRewrite(plan: SparkPlan): Boolean = {
TransformHints.isTransformable(plan) && {
@@ -125,3 +128,16 @@ class RewriteSparkPlanRulesManager(rewriteRules:
Seq[Rule[SparkPlan]]) extends R
}
}
}
+
+object RewriteSparkPlanRulesManager {
+ def apply(): Rule[SparkPlan] = {
+ val rewriteRules = Seq(
+ RewriteIn,
+ RewriteMultiChildrenCount,
+ RewriteCollect,
+ RewriteTypedImperativeAggregate,
+ PullOutPreProject,
+ PullOutPostProject)
+ new RewriteSparkPlanRulesManager(rewriteRules)
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
new file mode 100644
index 000000000..d58635e38
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.enumerated
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar._
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
+import org.apache.gluten.extension.columnar.util.AdaptiveContext
+import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter, SparkPlan}
+import org.apache.spark.util.SparkRuleUtil
+
+class EnumeratedApplier(session: SparkSession)
+ extends ColumnarRuleApplier
+ with Logging
+ with LogLevelUtil {
+
+ private lazy val transformPlanLogLevel =
GlutenConfig.getConf.transformPlanLogLevel
+ private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()
+
+ private val adaptiveContext = AdaptiveContext(session)
+
+ override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
+ withTransformRules(transformRules(outputsColumnar)).apply(plan)
+
+ // Visible for testing.
+ private def withTransformRules(
+ transformRules: List[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] =
+ plan =>
+ PhysicalPlanSelector.maybe(session, plan) {
+ val finalPlan = prepareFallback(plan) {
+ p =>
+ val suggestedPlan = transformPlan(transformRules, p, "transform")
+ transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match
{
+ case FallbackNode(fallbackPlan) =>
+ // we should use vanilla c2r rather than native c2r,
+ // and there should be no `GlutenPlan` any more,
+ // so skip the `postRules()`.
+ fallbackPlan
+ case plan =>
+ transformPlan(postRules(), plan, "post")
+ }
+ }
+ transformPlan(finalRules(), finalPlan, "final")
+ }
+
+ private def transformPlan(
+ getRules: List[SparkSession => Rule[SparkPlan]],
+ plan: SparkPlan,
+ step: String) = GlutenTimeMetric.withMillisTime {
+ logOnLevel(
+ transformPlanLogLevel,
+ s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}")
+ val overridden = getRules.foldLeft(plan) {
+ (p, getRule) =>
+ val rule = getRule(session)
+ val newPlan = rule(p)
+ planChangeLogger.logRule(rule.ruleName, p, newPlan)
+ newPlan
+ }
+ logOnLevel(
+ transformPlanLogLevel,
+ s"${step}ColumnarTransitions afterOverriden
plan:\n${overridden.toString}")
+ overridden
+ }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took:
$t ms."))
+
+ private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
+ adaptiveContext.setAdaptiveContext()
+ adaptiveContext.setOriginalPlan(plan)
+ try {
+ f(plan)
+ } finally {
+ adaptiveContext.resetOriginalPlan()
+ adaptiveContext.resetAdaptiveContext()
+ }
+ }
+
+ /**
+ * Rules to let planner create a suggested Gluten plan being sent to
`fallbackPolicies` in which
+ * the plan will be breakdown and decided to be fallen back or not.
+ */
+ private def transformRules(outputsColumnar: Boolean): List[SparkSession =>
Rule[SparkPlan]] = {
+ List(
+ (_: SparkSession) => RemoveTransitions,
+ (spark: SparkSession) => FallbackOnANSIMode(spark),
+ (spark: SparkSession) => FallbackMultiCodegens(spark),
+ (spark: SparkSession) => PlanOneRowRelation(spark),
+ (_: SparkSession) => FallbackEmptySchemaRelation()
+ ) :::
+
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
+ List(
+ (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark),
+ (_: SparkSession) => RewriteSparkPlanRulesManager(),
+ (_: SparkSession) => AddTransformHintRule(),
+ (_: SparkSession) => FallbackBloomFilterAggIfNeeded()
+ ) :::
+ List(
+ (_: SparkSession) => TransformPreOverrides(List(ImplementFilter()),
List.empty),
+ (session: SparkSession) => EnumeratedTransform(session,
outputsColumnar),
+ (_: SparkSession) => RemoveTransitions,
+ (_: SparkSession) => TransformPreOverrides(List.empty,
List(ImplementAggregate()))
+ ) :::
+ List(
+ (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
+ (spark: SparkSession) => RewriteTransformer(spark),
+ (_: SparkSession) => EnsureLocalSortRequirements,
+ (_: SparkSession) => CollapseProjectExecTransformer
+ ) :::
+
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
:::
+ SparkRuleUtil
+ .extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarTransformRules) :::
+ List((_: SparkSession) => InsertTransitions(outputsColumnar))
+ }
+
+ /**
+ * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints
to make planner fall
+ * back the whole input plan to the original vanilla Spark plan.
+ */
+ private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
+ List(
+ (_: SparkSession) =>
+ ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(),
adaptiveContext.originalPlan()))
+ }
+
+ /**
+ * Rules applying to non-fallen-back Gluten plans. To do some post cleanup
works on the plan to
+ * make sure it be able to run and be compatible with Spark's execution
engine.
+ */
+ private def postRules(): List[SparkSession => Rule[SparkPlan]] =
+ List(
+ (_: SparkSession) => TransformPostOverrides(),
+ (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
+ (s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())
+ ) :::
+
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
+ List((_: SparkSession) =>
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
+ SparkRuleUtil.extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarPostRules)
+
+ /*
+ * Rules consistently applying to all input plans after all other rules have
been applied, despite
+ * whether the input plan is fallen back or not.
+ */
+ private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
+ List(
+ // The rule is required despite whether the stage is fallen back or not.
Since
+ // ColumnarCachedBatchSerializer is statically registered to Spark
without a columnar rule
+ // when columnar table cache is enabled.
+ (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s),
+ (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s),
+ (_: SparkSession) => RemoveTransformHintRule()
+ )
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnumeratedTransform.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
similarity index 93%
rename from
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnumeratedTransform.scala
rename to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 832ff5883..0189f975d 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnumeratedTransform.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -14,9 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.extension.columnar
+package org.apache.gluten.extension.columnar.enumerated
-import org.apache.gluten.extension.columnar.transform.{ImplementExchange,
ImplementJoin, ImplementOthers, ImplementSingleNode}
+import org.apache.gluten.extension.columnar.{ImplementExchange, ImplementJoin,
ImplementOthers, ImplementSingleNode}
import org.apache.gluten.planner.GlutenOptimization
import org.apache.gluten.planner.property.GlutenProperties
import org.apache.gluten.ras.property.PropertySet
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
new file mode 100644
index 000000000..3fc2f325c
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.heuristic
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar._
+import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
+import org.apache.gluten.extension.columnar.util.AdaptiveContext
+import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter, SparkPlan}
+import org.apache.spark.util.SparkRuleUtil
+
+class HeuristicApplier(session: SparkSession)
+ extends ColumnarRuleApplier
+ with Logging
+ with LogLevelUtil {
+
+ private lazy val transformPlanLogLevel =
GlutenConfig.getConf.transformPlanLogLevel
+ private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()
+
+ private val adaptiveContext = AdaptiveContext(session)
+
+ override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
+ withTransformRules(transformRules(outputsColumnar)).apply(plan)
+
+ // Visible for testing.
+ def withTransformRules(transformRules: List[SparkSession =>
Rule[SparkPlan]]): Rule[SparkPlan] =
+ plan =>
+ PhysicalPlanSelector.maybe(session, plan) {
+ val finalPlan = prepareFallback(plan) {
+ p =>
+ val suggestedPlan = transformPlan(transformRules, p, "transform")
+ transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match
{
+ case FallbackNode(fallbackPlan) =>
+ // we should use vanilla c2r rather than native c2r,
+ // and there should be no `GlutenPlan` any more,
+ // so skip the `postRules()`.
+ fallbackPlan
+ case plan =>
+ transformPlan(postRules(), plan, "post")
+ }
+ }
+ transformPlan(finalRules(), finalPlan, "final")
+ }
+
+ private def transformPlan(
+ getRules: List[SparkSession => Rule[SparkPlan]],
+ plan: SparkPlan,
+ step: String) = GlutenTimeMetric.withMillisTime {
+ logOnLevel(
+ transformPlanLogLevel,
+ s"${step}ColumnarTransitions preOverridden plan:\n${plan.toString}")
+ val overridden = getRules.foldLeft(plan) {
+ (p, getRule) =>
+ val rule = getRule(session)
+ val newPlan = rule(p)
+ planChangeLogger.logRule(rule.ruleName, p, newPlan)
+ newPlan
+ }
+ logOnLevel(
+ transformPlanLogLevel,
+ s"${step}ColumnarTransitions afterOverridden
plan:\n${overridden.toString}")
+ overridden
+ }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took:
$t ms."))
+
+ private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
+ adaptiveContext.setAdaptiveContext()
+ adaptiveContext.setOriginalPlan(plan)
+ try {
+ f(plan)
+ } finally {
+ adaptiveContext.resetOriginalPlan()
+ adaptiveContext.resetAdaptiveContext()
+ }
+ }
+
+ /**
+ * Rules to let planner create a suggested Gluten plan being sent to
`fallbackPolicies` in which
+ * the plan will be breakdown and decided to be fallen back or not.
+ */
+ private def transformRules(outputsColumnar: Boolean): List[SparkSession =>
Rule[SparkPlan]] = {
+ List(
+ (_: SparkSession) => RemoveTransitions,
+ (spark: SparkSession) => FallbackOnANSIMode(spark),
+ (spark: SparkSession) => FallbackMultiCodegens(spark),
+ (spark: SparkSession) => PlanOneRowRelation(spark),
+ (_: SparkSession) => FallbackEmptySchemaRelation()
+ ) :::
+
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
:::
+ List(
+ (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark),
+ (_: SparkSession) => RewriteSparkPlanRulesManager(),
+ (_: SparkSession) => AddTransformHintRule(),
+ (_: SparkSession) => FallbackBloomFilterAggIfNeeded()
+ ) :::
+ List((_: SparkSession) => TransformPreOverrides()) :::
+ List(
+ (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
+ (spark: SparkSession) => RewriteTransformer(spark),
+ (_: SparkSession) => EnsureLocalSortRequirements,
+ (_: SparkSession) => CollapseProjectExecTransformer
+ ) :::
+
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
:::
+ SparkRuleUtil
+ .extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarTransformRules) :::
+ List((_: SparkSession) => InsertTransitions(outputsColumnar))
+ }
+
+ /**
+ * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints
to make planner fall
+ * back the whole input plan to the original vanilla Spark plan.
+ */
+ private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
+ List(
+ (_: SparkSession) =>
+ ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(),
adaptiveContext.originalPlan()))
+ }
+
+ /**
+ * Rules applying to non-fallen-back Gluten plans. To do some post cleanup
works on the plan to
+ * make sure it be able to run and be compatible with Spark's execution
engine.
+ */
+ private def postRules(): List[SparkSession => Rule[SparkPlan]] =
+ List(
+ (_: SparkSession) => TransformPostOverrides(),
+ (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
+ (s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())
+ ) :::
+
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
+ List((_: SparkSession) =>
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
+ SparkRuleUtil.extendedColumnarRules(session,
GlutenConfig.getConf.extendedColumnarPostRules)
+
+ /*
+ * Rules consistently applying to all input plans after all other rules have
been applied, despite
+ * whether the input plan is fallen back or not.
+ */
+ private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
+ List(
+ // The rule is required despite whether the stage is fallen back or not.
Since
+ // ColumnarCachedBatchSerializer is statically registered to Spark
without a columnar rule
+ // when columnar table cache is enabled.
+ (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s),
+ (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s),
+ (_: SparkSession) => RemoveTransformHintRule()
+ )
+ }
+
+ // Just for test use.
+ def enableAdaptiveContext(): HeuristicApplier = {
+ adaptiveContext.enableAdaptiveContext()
+ this
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
new file mode 100644
index 000000000..0592a3aca
--- /dev/null
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.util
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+
+import scala.collection.mutable.ListBuffer
+
+sealed trait AdaptiveContext {
+ def enableAdaptiveContext(): Unit
+ def isAdaptiveContext(): Boolean
+ def setAdaptiveContext(): Unit
+ def resetAdaptiveContext(): Unit
+ def setOriginalPlan(plan: SparkPlan): Unit
+ def originalPlan(): SparkPlan
+ def resetOriginalPlan(): Unit
+}
+
+object AdaptiveContext {
+ def apply(session: SparkSession): AdaptiveContext = new
AdaptiveContextImpl(session)
+
+ private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"
+
+ // This is an empirical value, may need to be changed for supporting other
versions of spark.
+ private val aqeStackTraceIndex = 19
+
+ // Holds the original plan for possible entire fallback.
+ private val localOriginalPlans: ThreadLocal[ListBuffer[SparkPlan]] =
+ ThreadLocal.withInitial(() => ListBuffer.empty[SparkPlan])
+ private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
+ ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])
+
+ private class AdaptiveContextImpl(session: SparkSession) extends
AdaptiveContext {
+ // Just for test use.
+ override def enableAdaptiveContext(): Unit = {
+ session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
+ }
+
+ override def isAdaptiveContext(): Boolean =
+ Option(session.sparkContext.getLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT))
+ .getOrElse("false")
+ .toBoolean ||
+ localIsAdaptiveContextFlags.get().head
+
+ override def setAdaptiveContext(): Unit = {
+ val traceElements = Thread.currentThread.getStackTrace
+ assert(
+ traceElements.length > aqeStackTraceIndex,
+ s"The number of stack trace elements is expected to be more than
$aqeStackTraceIndex")
+ // ApplyColumnarRulesAndInsertTransitions is called by either
QueryExecution or
+ // AdaptiveSparkPlanExec. So by checking the stack trace, we can know
whether
+ // columnar rule will be applied in adaptive execution context. This
part of code
+ // needs to be carefully checked when supporting higher versions of
spark to make
+ // sure the calling stack has not been changed.
+ localIsAdaptiveContextFlags
+ .get()
+ .prepend(
+ traceElements(aqeStackTraceIndex).getClassName
+ .equals(AdaptiveSparkPlanExec.getClass.getName))
+ }
+
+ override def resetAdaptiveContext(): Unit =
+ localIsAdaptiveContextFlags.get().remove(0)
+
+ override def setOriginalPlan(plan: SparkPlan): Unit = {
+ localOriginalPlans.get().prepend(plan)
+ }
+
+ override def originalPlan(): SparkPlan = {
+ val plan = localOriginalPlans.get().head
+ assert(plan != null)
+ plan
+ }
+
+ override def resetOriginalPlan(): Unit = localOriginalPlans.get().remove(0)
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
b/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
index ea45e1632..98c4ca37c 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.planner.cost.GlutenCostModel
import org.apache.gluten.planner.metadata.GlutenMetadataModel
import org.apache.gluten.planner.plan.GlutenPlanModel
import org.apache.gluten.planner.property.GlutenPropertyModel
-import org.apache.gluten.planner.rule.GlutenRules
import org.apache.gluten.ras.{Optimization, RasExplain}
import org.apache.gluten.ras.rule.RasRule
@@ -31,16 +30,6 @@ object GlutenOptimization {
override def describeNode(node: SparkPlan): String = node.nodeName
}
- def apply(): Optimization[SparkPlan] = {
- Optimization[SparkPlan](
- GlutenPlanModel(),
- GlutenCostModel(),
- GlutenMetadataModel(),
- GlutenPropertyModel(),
- GlutenExplain,
- RasRule.Factory.reuse(GlutenRules()))
- }
-
def apply(rules: Seq[RasRule[SparkPlan]]): Optimization[SparkPlan] = {
Optimization[SparkPlan](
GlutenPlanModel(),
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
index a106e84a5..7308703e7 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
@@ -18,8 +18,7 @@ package org.apache.spark.sql.execution.datasources
import org.apache.gluten.execution.{ProjectExecTransformer,
SortExecTransformer, TransformSupport, WholeStageTransformer}
import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects
-import org.apache.gluten.extension.ColumnarOverrideRules
-import org.apache.gluten.extension.columnar.AddTransformHintRule
+import org.apache.gluten.extension.columnar.{AddTransformHintRule,
RewriteSparkPlanRulesManager}
import
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
import org.apache.spark.rdd.RDD
@@ -46,7 +45,7 @@ trait GlutenFormatWriterInjectsBase extends
GlutenFormatWriterInjects {
}
val rules = List(
- ColumnarOverrideRules.rewriteSparkPlanRule(),
+ RewriteSparkPlanRulesManager(),
AddTransformHintRule(),
TransformPreOverrides()
)
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 5403fe45f..e2a99ef7e 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.sql.execution
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{ColumnarOverrideRules, GlutenPlan}
+import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.InsertTransitions
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
@@ -31,7 +32,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Fall back the whole query if one unsupported") {
withSQLConf(("spark.gluten.sql.columnar.query.fallback.threshold", "1")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark).withTransformRules(
+ val rule = new HeuristicApplier(spark).withTransformRules(
List(
_ =>
_ => {
@@ -47,7 +48,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Fall back the whole plan if meeting the configured threshold") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"1")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -65,7 +66,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Don't fall back the whole plan if NOT meeting the configured
threshold") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"4")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -85,7 +86,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
" transformable)") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"2")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -105,7 +106,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
"leaf node is transformable)") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"3")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 2158c2613..31517a141 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,8 +18,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{ColumnarOverrideRules, GlutenPlan}
+import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
@@ -31,7 +32,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Fall back the whole query if one unsupported") {
withSQLConf(("spark.gluten.sql.columnar.query.fallback.threshold", "1")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark).withTransformRules(
+ val rule = new HeuristicApplier(spark).withTransformRules(
List(
_ =>
_ => {
@@ -47,7 +48,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Fall back the whole plan if meeting the configured threshold") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"1")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -65,7 +66,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Don't fall back the whole plan if NOT meeting the configured
threshold") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"4")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -85,7 +86,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
" transformable)") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"2")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -105,7 +106,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
"leaf node is transformable)") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"3")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 451738759..079620bf8 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,8 +18,9 @@ package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{ColumnarOverrideRules, GlutenPlan}
+import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation,
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
import org.apache.gluten.utils.QueryPlanSelector
import org.apache.spark.rdd.RDD
@@ -32,7 +33,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Fall back the whole query if one unsupported") {
withSQLConf(("spark.gluten.sql.columnar.query.fallback.threshold", "1")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark).withTransformRules(
+ val rule = new HeuristicApplier(spark).withTransformRules(
List(
_ =>
_ => {
@@ -48,7 +49,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Fall back the whole plan if meeting the configured threshold") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"1")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -66,7 +67,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
testGluten("Don't fall back the whole plan if NOT meeting the configured
threshold") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"4")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -86,7 +87,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
" transformable)") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"2")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
@@ -106,7 +107,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
"leaf node is transformable)") {
withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold",
"3")) {
val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
- val rule = ColumnarOverrideRules(spark)
+ val rule = new HeuristicApplier(spark)
.enableAdaptiveContext()
.withTransformRules(
List(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]