This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new dc4bc629a [VL] RAS: Add EnumeratedApplier to manage columnar rule 
applications when ras is enabled (#5276)
dc4bc629a is described below

commit dc4bc629aaadbdbd285f6d0654643733a8542607
Author: Hongze Zhang <[email protected]>
AuthorDate: Sun Apr 7 12:06:02 2024 +0800

    [VL] RAS: Add EnumeratedApplier to manage columnar rule applications when 
ras is enabled (#5276)
---
 .../org/apache/gluten/planner/VeloxRasSuite.scala  |   2 +-
 .../gluten/extension/ColumnarOverrides.scala       | 265 +--------------------
 .../columnar/ColumnarRuleApplier.scala}            |  10 +-
 .../extension/columnar/ColumnarTransitions.scala   |  38 ++-
 .../{transform => }/ImplementSingleNode.scala      |   3 +-
 .../extension/columnar/MiscColumnarRules.scala     |   3 +-
 .../columnar/RewriteSparkPlanRulesManager.scala    |  18 +-
 .../columnar/enumerated/EnumeratedApplier.scala    | 173 ++++++++++++++
 .../{ => enumerated}/EnumeratedTransform.scala     |   4 +-
 .../columnar/heuristic/HeuristicApplier.scala      | 173 ++++++++++++++
 .../extension/columnar/util/AdaptiveContext.scala  |  93 ++++++++
 .../apache/gluten/planner/GlutenOptimization.scala |  11 -
 .../GlutenFormatWriterInjectsBase.scala            |   5 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  13 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  13 +-
 .../sql/execution/FallbackStrategiesSuite.scala    |  13 +-
 16 files changed, 534 insertions(+), 303 deletions(-)

diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala 
b/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
index 4b1ee935f..cf600ff56 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/planner/VeloxRasSuite.scala
@@ -95,7 +95,7 @@ class VeloxRasSuite extends SharedSparkSession {
 
 object VeloxRasSuite {
   def newRas(): Ras[SparkPlan] = {
-    GlutenOptimization().asInstanceOf[Ras[SparkPlan]]
+    GlutenOptimization(List()).asInstanceOf[Ras[SparkPlan]]
   }
 
   def newRas(RasRules: Seq[RasRule[SparkPlan]]): Ras[SparkPlan] = {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
index b664aa988..8ba51342a 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/ColumnarOverrides.scala
@@ -17,12 +17,10 @@
 package org.apache.gluten.extension
 
 import org.apache.gluten.{GlutenConfig, GlutenSparkExtensionsInjector}
-import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.extension.columnar._
-import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
-import org.apache.gluten.extension.columnar.transform._
-import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector, PlanUtil}
+import org.apache.gluten.extension.columnar.enumerated.EnumeratedApplier
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
+import org.apache.gluten.utils.LogLevelUtil
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
@@ -30,68 +28,11 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive._
 import org.apache.spark.sql.vectorized.ColumnarBatch
-import org.apache.spark.util.SparkRuleUtil
-
-import scala.collection.mutable.ListBuffer
-
-private[extension] object ColumnarToRowLike {
-  def unapply(plan: SparkPlan): Option[SparkPlan] = {
-    plan match {
-      case c2r: ColumnarToRowTransition =>
-        Some(c2r.child)
-      case _ => None
-    }
-  }
-}
-// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
-// to support vanilla columnar operators.
-case class InsertColumnarToColumnarTransitions(session: SparkSession) extends 
Rule[SparkPlan] {
-  @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
-  private def replaceWithVanillaColumnarToRow(p: SparkPlan): SparkPlan = 
p.transformUp {
-    case plan if PlanUtil.isGlutenColumnarOp(plan) =>
-      plan.withNewChildren(plan.children.map {
-        case child if PlanUtil.isVanillaColumnarOp(child) =>
-          BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
-            ColumnarToRowExec(child))
-        case other => other
-      })
-  }
-
-  private def replaceWithVanillaRowToColumnar(p: SparkPlan): SparkPlan = 
p.transformUp {
-    case plan if PlanUtil.isVanillaColumnarOp(plan) =>
-      plan.withNewChildren(plan.children.map {
-        case child if PlanUtil.isGlutenColumnarOp(child) =>
-          RowToColumnarExec(
-            
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
-        case other => other
-      })
-  }
-
-  def apply(plan: SparkPlan): SparkPlan = {
-    val newPlan = 
replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
-    planChangeLogger.logRule(ruleName, plan, newPlan)
-    newPlan
-  }
-}
 
 object ColumnarOverrideRules {
-  val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"
-
-  def rewriteSparkPlanRule(): Rule[SparkPlan] = {
-    val rewriteRules = Seq(
-      RewriteIn,
-      RewriteMultiChildrenCount,
-      RewriteCollect,
-      RewriteTypedImperativeAggregate,
-      PullOutPreProject,
-      PullOutPostProject)
-    new RewriteSparkPlanRulesManager(rewriteRules)
-  }
 
   // Utilities to infer columnar rule's caller's property:
   // ApplyColumnarRulesAndInsertTransitions#outputsColumnar.
@@ -160,158 +101,6 @@ case class ColumnarOverrideRules(session: SparkSession)
 
   import ColumnarOverrideRules._
 
-  private lazy val transformPlanLogLevel = 
GlutenConfig.getConf.transformPlanLogLevel
-  @transient private lazy val planChangeLogger = new 
PlanChangeLogger[SparkPlan]()
-
-  // This is an empirical value, may need to be changed for supporting other 
versions of spark.
-  private val aqeStackTraceIndex = 18
-
-  // Holds the original plan for possible entire fallback.
-  private val localOriginalPlans: ThreadLocal[ListBuffer[SparkPlan]] =
-    ThreadLocal.withInitial(() => ListBuffer.empty[SparkPlan])
-  private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
-    ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])
-
-  // Do not create rules in class initialization as we should access SQLConf
-  // while creating the rules. At this time SQLConf may not be there yet.
-
-  // Just for test use.
-  def enableAdaptiveContext(): ColumnarOverrideRules = {
-    session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
-    this
-  }
-
-  private def isAdaptiveContext: Boolean =
-    Option(session.sparkContext.getLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT))
-      .getOrElse("false")
-      .toBoolean ||
-      localIsAdaptiveContextFlags.get().head
-
-  private def setAdaptiveContext(): Unit = {
-    val traceElements = Thread.currentThread.getStackTrace
-    assert(
-      traceElements.length > aqeStackTraceIndex,
-      s"The number of stack trace elements is expected to be more than 
$aqeStackTraceIndex")
-    // ApplyColumnarRulesAndInsertTransitions is called by either 
QueryExecution or
-    // AdaptiveSparkPlanExec. So by checking the stack trace, we can know 
whether
-    // columnar rule will be applied in adaptive execution context. This part 
of code
-    // needs to be carefully checked when supporting higher versions of spark 
to make
-    // sure the calling stack has not been changed.
-    localIsAdaptiveContextFlags
-      .get()
-      .prepend(
-        traceElements(aqeStackTraceIndex).getClassName
-          .equals(AdaptiveSparkPlanExec.getClass.getName))
-  }
-
-  private def resetAdaptiveContext(): Unit =
-    localIsAdaptiveContextFlags.get().remove(0)
-
-  private def setOriginalPlan(plan: SparkPlan): Unit = {
-    localOriginalPlans.get.prepend(plan)
-  }
-
-  private def originalPlan: SparkPlan = {
-    val plan = localOriginalPlans.get.head
-    assert(plan != null)
-    plan
-  }
-
-  private def resetOriginalPlan(): Unit = localOriginalPlans.get.remove(0)
-
-  /**
-   * Rules to let planner create a suggested Gluten plan being sent to 
`fallbackPolicies` in which
-   * the plan will be breakdown and decided to be fallen back or not.
-   */
-  private def transformRules(outputsColumnar: Boolean): List[SparkSession => 
Rule[SparkPlan]] = {
-
-    def maybeRas(outputsColumnar: Boolean): List[SparkSession => 
Rule[SparkPlan]] = {
-      if (GlutenConfig.getConf.enableRas) {
-        return List(
-          (_: SparkSession) => TransformPreOverrides(List(ImplementFilter()), 
List.empty),
-          (session: SparkSession) => EnumeratedTransform(session, 
outputsColumnar),
-          (_: SparkSession) => RemoveTransitions,
-          (_: SparkSession) => TransformPreOverrides(List.empty, 
List(ImplementAggregate()))
-        )
-      }
-      List((_: SparkSession) => TransformPreOverrides())
-    }
-
-    List(
-      (_: SparkSession) => RemoveTransitions,
-      (spark: SparkSession) => FallbackOnANSIMode(spark),
-      (spark: SparkSession) => FallbackMultiCodegens(spark),
-      (spark: SparkSession) => PlanOneRowRelation(spark),
-      (_: SparkSession) => FallbackEmptySchemaRelation()
-    ) :::
-      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
 :::
-      List(
-        (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark),
-        (_: SparkSession) => rewriteSparkPlanRule(),
-        (_: SparkSession) => AddTransformHintRule(),
-        (_: SparkSession) => FallbackBloomFilterAggIfNeeded()
-      ) :::
-      maybeRas(outputsColumnar) :::
-      List(
-        (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
-        (spark: SparkSession) => RewriteTransformer(spark),
-        (_: SparkSession) => EnsureLocalSortRequirements,
-        (_: SparkSession) => CollapseProjectExecTransformer
-      ) :::
-      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
 :::
-      SparkRuleUtil
-        .extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarTransformRules) :::
-      List((_: SparkSession) => InsertTransitions(outputsColumnar))
-  }
-
-  /**
-   * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints 
to make planner fall
-   * back the whole input plan to the original vanilla Spark plan.
-   */
-  private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
-    List((_: SparkSession) => ExpandFallbackPolicy(isAdaptiveContext, 
originalPlan))
-  }
-
-  /**
-   * Rules applying to non-fallen-back Gluten plans. To do some post cleanup 
works on the plan to
-   * make sure it be able to run and be compatible with Spark's execution 
engine.
-   */
-  private def postRules(): List[SparkSession => Rule[SparkPlan]] =
-    List(
-      (_: SparkSession) => TransformPostOverrides(),
-      (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
-      (s: SparkSession) => RemoveTopmostColumnarToRow(s, isAdaptiveContext)
-    ) :::
-      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() 
:::
-      List((_: SparkSession) => 
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
-      SparkRuleUtil.extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarPostRules)
-
-  /*
-   * Rules consistently applying to all input plans after all other rules have 
been applied, despite
-   * whether the input plan is fallen back or not.
-   */
-  private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
-    List(
-      // The rule is required despite whether the stage is fallen back or not. 
Since
-      // ColumnarCachedBatchSerializer is statically registered to Spark 
without a columnar rule
-      // when columnar table cache is enabled.
-      (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s),
-      (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s),
-      (_: SparkSession) => RemoveTransformHintRule()
-    )
-  }
-
-  private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
-    setAdaptiveContext()
-    setOriginalPlan(plan)
-    try {
-      f(plan)
-    } finally {
-      resetOriginalPlan()
-      resetAdaptiveContext()
-    }
-  }
-
   /**
    * Note: Do not implement this API. We basically inject all of Gluten's 
physical rules through
    * `postColumnarTransitions`.
@@ -327,48 +116,14 @@ case class ColumnarOverrideRules(session: SparkSession)
     val outputsColumnar = OutputsColumnarTester.inferOutputsColumnar(plan)
     val unwrapped = OutputsColumnarTester.unwrap(plan)
     val vanillaPlan = ColumnarTransitions.insertTransitions(unwrapped, 
outputsColumnar)
-    withTransformRules(transformRules(outputsColumnar)).apply(vanillaPlan)
+    val applier: ColumnarRuleApplier = if (GlutenConfig.getConf.enableRas) {
+      new EnumeratedApplier(session)
+    } else {
+      new HeuristicApplier(session)
+    }
+    applier.apply(vanillaPlan, outputsColumnar)
   }
 
-  // Visible for testing.
-  def withTransformRules(transformRules: List[SparkSession => 
Rule[SparkPlan]]): Rule[SparkPlan] =
-    plan =>
-      PhysicalPlanSelector.maybe(session, plan) {
-        val finalPlan = prepareFallback(plan) {
-          p =>
-            val suggestedPlan = transformPlan(transformRules, p, "transform")
-            transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match 
{
-              case FallbackNode(fallbackPlan) =>
-                // we should use vanilla c2r rather than native c2r,
-                // and there should be no `GlutenPlan` any more,
-                // so skip the `postRules()`.
-                fallbackPlan
-              case plan =>
-                transformPlan(postRules(), plan, "post")
-            }
-        }
-        transformPlan(finalRules(), finalPlan, "final")
-      }
-
-  private def transformPlan(
-      getRules: List[SparkSession => Rule[SparkPlan]],
-      plan: SparkPlan,
-      step: String) = GlutenTimeMetric.withMillisTime {
-    logOnLevel(
-      transformPlanLogLevel,
-      s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}")
-    val overridden = getRules.foldLeft(plan) {
-      (p, getRule) =>
-        val rule = getRule(session)
-        val newPlan = rule(p)
-        planChangeLogger.logRule(rule.ruleName, p, newPlan)
-        newPlan
-    }
-    logOnLevel(
-      transformPlanLogLevel,
-      s"${step}ColumnarTransitions afterOverriden 
plan:\n${overridden.toString}")
-    overridden
-  }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: 
$t ms."))
 }
 
 object ColumnarOverrides extends GlutenSparkExtensionsInjector {
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/planner/rule/GlutenRules.scala 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
similarity index 83%
rename from 
gluten-core/src/main/scala/org/apache/gluten/planner/rule/GlutenRules.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 59250c563..17bf01730 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/planner/rule/GlutenRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -14,14 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.planner.rule
-
-import org.apache.gluten.ras.rule.RasRule
+package org.apache.gluten.extension.columnar
 
 import org.apache.spark.sql.execution.SparkPlan
 
-object GlutenRules {
-  def apply(): Seq[RasRule[SparkPlan]] = {
-    List() // TODO
-  }
+trait ColumnarRuleApplier {
+  def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
index b6a1bd2c4..5dd266433 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarTransitions.scala
@@ -16,7 +16,11 @@
  */
 package org.apache.gluten.extension.columnar
 
-import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.utils.PlanUtil
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
 import org.apache.spark.sql.execution.{ApplyColumnarRulesAndInsertTransitions, 
ColumnarToRowExec, ColumnarToRowTransition, RowToColumnarExec, 
RowToColumnarTransition, SparkPlan}
 
 /** See rule code from vanilla Spark: 
[[ApplyColumnarRulesAndInsertTransitions]]. */
@@ -44,6 +48,38 @@ object RemoveTransitions extends Rule[SparkPlan] {
   }
 }
 
+// This rule will try to add RowToColumnarExecBase and ColumnarToRowExec
+// to support vanilla columnar operators.
+case class InsertColumnarToColumnarTransitions(session: SparkSession) extends 
Rule[SparkPlan] {
+  @transient private val planChangeLogger = new PlanChangeLogger[SparkPlan]()
+
+  private def replaceWithVanillaColumnarToRow(p: SparkPlan): SparkPlan = 
p.transformUp {
+    case plan if PlanUtil.isGlutenColumnarOp(plan) =>
+      plan.withNewChildren(plan.children.map {
+        case child if PlanUtil.isVanillaColumnarOp(child) =>
+          BackendsApiManager.getSparkPlanExecApiInstance.genRowToColumnarExec(
+            ColumnarToRowExec(child))
+        case other => other
+      })
+  }
+
+  private def replaceWithVanillaRowToColumnar(p: SparkPlan): SparkPlan = 
p.transformUp {
+    case plan if PlanUtil.isVanillaColumnarOp(plan) =>
+      plan.withNewChildren(plan.children.map {
+        case child if PlanUtil.isGlutenColumnarOp(child) =>
+          RowToColumnarExec(
+            
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToRowExec(child))
+        case other => other
+      })
+  }
+
+  def apply(plan: SparkPlan): SparkPlan = {
+    val newPlan = 
replaceWithVanillaRowToColumnar(replaceWithVanillaColumnarToRow(plan))
+    planChangeLogger.logRule(ruleName, plan, newPlan)
+    newPlan
+  }
+}
+
 object ColumnarTransitions {
   def insertTransitions(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan 
= {
     InsertTransitions(outputsColumnar).apply(plan)
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transform/ImplementSingleNode.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ImplementSingleNode.scala
similarity index 99%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transform/ImplementSingleNode.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ImplementSingleNode.scala
index 5820de270..3d0669de1 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/transform/ImplementSingleNode.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ImplementSingleNode.scala
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension.columnar.transform
+package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.backendsapi.BackendsApiManager
@@ -22,7 +22,6 @@ import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.execution._
 import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.extension.GlutenPlan
-import org.apache.gluten.extension.columnar.TransformHints
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
index aa3c54144..0fd0856f2 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/MiscColumnarRules.scala
@@ -17,8 +17,7 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.extension.ColumnarToRowLike
-import org.apache.gluten.extension.columnar.transform.{ImplementAggregate, 
ImplementExchange, ImplementFilter, ImplementJoin, ImplementOthers, 
ImplementSingleNode}
+import 
org.apache.gluten.extension.columnar.ColumnarTransitions.ColumnarToRowLike
 import org.apache.gluten.utils.{LogLevelUtil, PlanUtil}
 
 import org.apache.spark.sql.SparkSession
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
index 7de39d5fd..e694c23a7 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/RewriteSparkPlanRulesManager.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.gluten.extension.columnar
 
+import org.apache.gluten.extension.{RewriteCollect, RewriteIn}
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
@@ -41,7 +43,8 @@ case class RewrittenNodeWall(originalChild: SparkPlan) 
extends LeafExecNode {
  *
  * Note that, this rule does not touch and tag these operators who does not 
need to rewrite.
  */
-class RewriteSparkPlanRulesManager(rewriteRules: Seq[Rule[SparkPlan]]) extends 
Rule[SparkPlan] {
+class RewriteSparkPlanRulesManager private (rewriteRules: Seq[Rule[SparkPlan]])
+  extends Rule[SparkPlan] {
 
   private def mayNeedRewrite(plan: SparkPlan): Boolean = {
     TransformHints.isTransformable(plan) && {
@@ -125,3 +128,16 @@ class RewriteSparkPlanRulesManager(rewriteRules: 
Seq[Rule[SparkPlan]]) extends R
     }
   }
 }
+
+object RewriteSparkPlanRulesManager {
+  def apply(): Rule[SparkPlan] = {
+    val rewriteRules = Seq(
+      RewriteIn,
+      RewriteMultiChildrenCount,
+      RewriteCollect,
+      RewriteTypedImperativeAggregate,
+      PullOutPreProject,
+      PullOutPostProject)
+    new RewriteSparkPlanRulesManager(rewriteRules)
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
new file mode 100644
index 000000000..d58635e38
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.enumerated
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar._
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
+import org.apache.gluten.extension.columnar.util.AdaptiveContext
+import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
GlutenFallbackReporter, SparkPlan}
+import org.apache.spark.util.SparkRuleUtil
+
+class EnumeratedApplier(session: SparkSession)
+  extends ColumnarRuleApplier
+  with Logging
+  with LogLevelUtil {
+
+  private lazy val transformPlanLogLevel = 
GlutenConfig.getConf.transformPlanLogLevel
+  private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()
+
+  private val adaptiveContext = AdaptiveContext(session)
+
+  override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
+    withTransformRules(transformRules(outputsColumnar)).apply(plan)
+
+  // Visible for testing.
+  private def withTransformRules(
+      transformRules: List[SparkSession => Rule[SparkPlan]]): Rule[SparkPlan] =
+    plan =>
+      PhysicalPlanSelector.maybe(session, plan) {
+        val finalPlan = prepareFallback(plan) {
+          p =>
+            val suggestedPlan = transformPlan(transformRules, p, "transform")
+            transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match 
{
+              case FallbackNode(fallbackPlan) =>
+                // we should use vanilla c2r rather than native c2r,
+                // and there should be no `GlutenPlan` any more,
+                // so skip the `postRules()`.
+                fallbackPlan
+              case plan =>
+                transformPlan(postRules(), plan, "post")
+            }
+        }
+        transformPlan(finalRules(), finalPlan, "final")
+      }
+
+  private def transformPlan(
+      getRules: List[SparkSession => Rule[SparkPlan]],
+      plan: SparkPlan,
+      step: String) = GlutenTimeMetric.withMillisTime {
+    logOnLevel(
+      transformPlanLogLevel,
+      s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}")
+    val overridden = getRules.foldLeft(plan) {
+      (p, getRule) =>
+        val rule = getRule(session)
+        val newPlan = rule(p)
+        planChangeLogger.logRule(rule.ruleName, p, newPlan)
+        newPlan
+    }
+    logOnLevel(
+      transformPlanLogLevel,
+      s"${step}ColumnarTransitions afterOverriden 
plan:\n${overridden.toString}")
+    overridden
+  }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: 
$t ms."))
+
+  private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
+    adaptiveContext.setAdaptiveContext()
+    adaptiveContext.setOriginalPlan(plan)
+    try {
+      f(plan)
+    } finally {
+      adaptiveContext.resetOriginalPlan()
+      adaptiveContext.resetAdaptiveContext()
+    }
+  }
+
+  /**
+   * Rules to let planner create a suggested Gluten plan being sent to 
`fallbackPolicies` in which
+   * the plan will be breakdown and decided to be fallen back or not.
+   */
+  private def transformRules(outputsColumnar: Boolean): List[SparkSession => 
Rule[SparkPlan]] = {
+    List(
+      (_: SparkSession) => RemoveTransitions,
+      (spark: SparkSession) => FallbackOnANSIMode(spark),
+      (spark: SparkSession) => FallbackMultiCodegens(spark),
+      (spark: SparkSession) => PlanOneRowRelation(spark),
+      (_: SparkSession) => FallbackEmptySchemaRelation()
+    ) :::
+      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
+    List(
+      (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark),
+      (_: SparkSession) => RewriteSparkPlanRulesManager(),
+      (_: SparkSession) => AddTransformHintRule(),
+      (_: SparkSession) => FallbackBloomFilterAggIfNeeded()
+    ) :::
+      List(
+        (_: SparkSession) => TransformPreOverrides(List(ImplementFilter()), 
List.empty),
+        (session: SparkSession) => EnumeratedTransform(session, 
outputsColumnar),
+        (_: SparkSession) => RemoveTransitions,
+        (_: SparkSession) => TransformPreOverrides(List.empty, 
List(ImplementAggregate()))
+      ) :::
+      List(
+        (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
+        (spark: SparkSession) => RewriteTransformer(spark),
+        (_: SparkSession) => EnsureLocalSortRequirements,
+        (_: SparkSession) => CollapseProjectExecTransformer
+      ) :::
+      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
 :::
+      SparkRuleUtil
+        .extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarTransformRules) :::
+      List((_: SparkSession) => InsertTransitions(outputsColumnar))
+  }
+
+  /**
+   * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints 
to make planner fall
+   * back the whole input plan to the original vanilla Spark plan.
+   */
+  private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
+    List(
+      (_: SparkSession) =>
+        ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(), 
adaptiveContext.originalPlan()))
+  }
+
+  /**
+   * Rules applying to non-fallen-back Gluten plans. To do some post cleanup 
works on the plan to
+   * make sure it be able to run and be compatible with Spark's execution 
engine.
+   */
+  private def postRules(): List[SparkSession => Rule[SparkPlan]] =
+    List(
+      (_: SparkSession) => TransformPostOverrides(),
+      (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
+      (s: SparkSession) => RemoveTopmostColumnarToRow(s, 
adaptiveContext.isAdaptiveContext())
+    ) :::
+      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() 
:::
+      List((_: SparkSession) => 
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
+      SparkRuleUtil.extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarPostRules)
+
+  /*
+   * Rules consistently applying to all input plans after all other rules have 
been applied, despite
+   * whether the input plan is fallen back or not.
+   */
+  private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
+    List(
+      // The rule is required despite whether the stage is fallen back or not. 
Since
+      // ColumnarCachedBatchSerializer is statically registered to Spark 
without a columnar rule
+      // when columnar table cache is enabled.
+      (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s),
+      (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s),
+      (_: SparkSession) => RemoveTransformHintRule()
+    )
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnumeratedTransform.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
similarity index 93%
rename from 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnumeratedTransform.scala
rename to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
index 832ff5883..0189f975d 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/EnumeratedTransform.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedTransform.scala
@@ -14,9 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.extension.columnar
+package org.apache.gluten.extension.columnar.enumerated
 
-import org.apache.gluten.extension.columnar.transform.{ImplementExchange, 
ImplementJoin, ImplementOthers, ImplementSingleNode}
+import org.apache.gluten.extension.columnar.{ImplementExchange, ImplementJoin, 
ImplementOthers, ImplementSingleNode}
 import org.apache.gluten.planner.GlutenOptimization
 import org.apache.gluten.planner.property.GlutenProperties
 import org.apache.gluten.ras.property.PropertySet
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
new file mode 100644
index 000000000..3fc2f325c
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.heuristic
+
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar._
+import 
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
 RemoveTopmostColumnarToRow, TransformPostOverrides, TransformPreOverrides}
+import org.apache.gluten.extension.columnar.util.AdaptiveContext
+import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
GlutenFallbackReporter, SparkPlan}
+import org.apache.spark.util.SparkRuleUtil
+
+class HeuristicApplier(session: SparkSession)
+  extends ColumnarRuleApplier
+  with Logging
+  with LogLevelUtil {
+
+  private lazy val transformPlanLogLevel = 
GlutenConfig.getConf.transformPlanLogLevel
+  private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()
+
+  private val adaptiveContext = AdaptiveContext(session)
+
+  override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
+    withTransformRules(transformRules(outputsColumnar)).apply(plan)
+
+  // Visible for testing.
+  def withTransformRules(transformRules: List[SparkSession => 
Rule[SparkPlan]]): Rule[SparkPlan] =
+    plan =>
+      PhysicalPlanSelector.maybe(session, plan) {
+        val finalPlan = prepareFallback(plan) {
+          p =>
+            val suggestedPlan = transformPlan(transformRules, p, "transform")
+            transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match 
{
+              case FallbackNode(fallbackPlan) =>
+                // we should use vanilla c2r rather than native c2r,
+                // and there should be no `GlutenPlan` any more,
+                // so skip the `postRules()`.
+                fallbackPlan
+              case plan =>
+                transformPlan(postRules(), plan, "post")
+            }
+        }
+        transformPlan(finalRules(), finalPlan, "final")
+      }
+
+  private def transformPlan(
+      getRules: List[SparkSession => Rule[SparkPlan]],
+      plan: SparkPlan,
+      step: String) = GlutenTimeMetric.withMillisTime {
+    logOnLevel(
+      transformPlanLogLevel,
+      s"${step}ColumnarTransitions preOverridden plan:\n${plan.toString}")
+    val overridden = getRules.foldLeft(plan) {
+      (p, getRule) =>
+        val rule = getRule(session)
+        val newPlan = rule(p)
+        planChangeLogger.logRule(rule.ruleName, p, newPlan)
+        newPlan
+    }
+    logOnLevel(
+      transformPlanLogLevel,
+      s"${step}ColumnarTransitions afterOverridden 
plan:\n${overridden.toString}")
+    overridden
+  }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took: 
$t ms."))
+
+  private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
+    adaptiveContext.setAdaptiveContext()
+    adaptiveContext.setOriginalPlan(plan)
+    try {
+      f(plan)
+    } finally {
+      adaptiveContext.resetOriginalPlan()
+      adaptiveContext.resetAdaptiveContext()
+    }
+  }
+
+  /**
+   * Rules to let planner create a suggested Gluten plan being sent to 
`fallbackPolicies` in which
+   * the plan will be breakdown and decided to be fallen back or not.
+   */
+  private def transformRules(outputsColumnar: Boolean): List[SparkSession => 
Rule[SparkPlan]] = {
+    List(
+      (_: SparkSession) => RemoveTransitions,
+      (spark: SparkSession) => FallbackOnANSIMode(spark),
+      (spark: SparkSession) => FallbackMultiCodegens(spark),
+      (spark: SparkSession) => PlanOneRowRelation(spark),
+      (_: SparkSession) => FallbackEmptySchemaRelation()
+    ) :::
+      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarValidationRules()
 :::
+      List(
+        (spark: SparkSession) => MergeTwoPhasesHashBaseAggregate(spark),
+        (_: SparkSession) => RewriteSparkPlanRulesManager(),
+        (_: SparkSession) => AddTransformHintRule(),
+        (_: SparkSession) => FallbackBloomFilterAggIfNeeded()
+      ) :::
+      List((_: SparkSession) => TransformPreOverrides()) :::
+      List(
+        (_: SparkSession) => RemoveNativeWriteFilesSortAndProject(),
+        (spark: SparkSession) => RewriteTransformer(spark),
+        (_: SparkSession) => EnsureLocalSortRequirements,
+        (_: SparkSession) => CollapseProjectExecTransformer
+      ) :::
+      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarTransformRules()
 :::
+      SparkRuleUtil
+        .extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarTransformRules) :::
+      List((_: SparkSession) => InsertTransitions(outputsColumnar))
+  }
+
+  /**
+   * Rules to add wrapper `FallbackNode`s on top of the input plan, as hints 
to make planner fall
+   * back the whole input plan to the original vanilla Spark plan.
+   */
+  private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
+    List(
+      (_: SparkSession) =>
+        ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(), 
adaptiveContext.originalPlan()))
+  }
+
+  /**
+   * Rules applying to non-fallen-back Gluten plans. To do some post cleanup 
works on the plan to
+   * make sure it be able to run and be compatible with Spark's execution 
engine.
+   */
+  private def postRules(): List[SparkSession => Rule[SparkPlan]] =
+    List(
+      (_: SparkSession) => TransformPostOverrides(),
+      (s: SparkSession) => InsertColumnarToColumnarTransitions(s),
+      (s: SparkSession) => RemoveTopmostColumnarToRow(s, 
adaptiveContext.isAdaptiveContext())
+    ) :::
+      
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules() 
:::
+      List((_: SparkSession) => 
ColumnarCollapseTransformStages(GlutenConfig.getConf)) :::
+      SparkRuleUtil.extendedColumnarRules(session, 
GlutenConfig.getConf.extendedColumnarPostRules)
+
+  /*
+   * Rules consistently applying to all input plans after all other rules have 
been applied, despite
+   * whether the input plan is fallen back or not.
+   */
+  private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
+    List(
+      // The rule is required despite whether the stage is fallen back or not. 
Since
+      // ColumnarCachedBatchSerializer is statically registered to Spark 
without a columnar rule
+      // when columnar table cache is enabled.
+      (s: SparkSession) => RemoveGlutenTableCacheColumnarToRow(s),
+      (s: SparkSession) => GlutenFallbackReporter(GlutenConfig.getConf, s),
+      (_: SparkSession) => RemoveTransformHintRule()
+    )
+  }
+
+  // Just for test use.
+  def enableAdaptiveContext(): HeuristicApplier = {
+    adaptiveContext.enableAdaptiveContext()
+    this
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
new file mode 100644
index 000000000..0592a3aca
--- /dev/null
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.extension.columnar.util
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
+
+import scala.collection.mutable.ListBuffer
+
+sealed trait AdaptiveContext {
+  def enableAdaptiveContext(): Unit
+  def isAdaptiveContext(): Boolean
+  def setAdaptiveContext(): Unit
+  def resetAdaptiveContext(): Unit
+  def setOriginalPlan(plan: SparkPlan): Unit
+  def originalPlan(): SparkPlan
+  def resetOriginalPlan(): Unit
+}
+
+object AdaptiveContext {
+  def apply(session: SparkSession): AdaptiveContext = new 
AdaptiveContextImpl(session)
+
+  private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"
+
+  // This is an empirical value, may need to be changed for supporting other 
versions of spark.
+  private val aqeStackTraceIndex = 19
+
+  // Holds the original plan for possible entire fallback.
+  private val localOriginalPlans: ThreadLocal[ListBuffer[SparkPlan]] =
+    ThreadLocal.withInitial(() => ListBuffer.empty[SparkPlan])
+  private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
+    ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])
+
+  private class AdaptiveContextImpl(session: SparkSession) extends 
AdaptiveContext {
+    // Just for test use.
+    override def enableAdaptiveContext(): Unit = {
+      session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
+    }
+
+    override def isAdaptiveContext(): Boolean =
+      Option(session.sparkContext.getLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT))
+        .getOrElse("false")
+        .toBoolean ||
+        localIsAdaptiveContextFlags.get().head
+
+    override def setAdaptiveContext(): Unit = {
+      val traceElements = Thread.currentThread.getStackTrace
+      assert(
+        traceElements.length > aqeStackTraceIndex,
+        s"The number of stack trace elements is expected to be more than 
$aqeStackTraceIndex")
+      // ApplyColumnarRulesAndInsertTransitions is called by either 
QueryExecution or
+      // AdaptiveSparkPlanExec. So by checking the stack trace, we can know 
whether
+      // columnar rule will be applied in adaptive execution context. This 
part of code
+      // needs to be carefully checked when supporting higher versions of 
spark to make
+      // sure the calling stack has not been changed.
+      localIsAdaptiveContextFlags
+        .get()
+        .prepend(
+          traceElements(aqeStackTraceIndex).getClassName
+            .equals(AdaptiveSparkPlanExec.getClass.getName))
+    }
+
+    override def resetAdaptiveContext(): Unit =
+      localIsAdaptiveContextFlags.get().remove(0)
+
+    override def setOriginalPlan(plan: SparkPlan): Unit = {
+      localOriginalPlans.get().prepend(plan)
+    }
+
+    override def originalPlan(): SparkPlan = {
+      val plan = localOriginalPlans.get().head
+      assert(plan != null)
+      plan
+    }
+
+    override def resetOriginalPlan(): Unit = localOriginalPlans.get().remove(0)
+  }
+}
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala 
b/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
index ea45e1632..98c4ca37c 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/planner/GlutenOptimization.scala
@@ -20,7 +20,6 @@ import org.apache.gluten.planner.cost.GlutenCostModel
 import org.apache.gluten.planner.metadata.GlutenMetadataModel
 import org.apache.gluten.planner.plan.GlutenPlanModel
 import org.apache.gluten.planner.property.GlutenPropertyModel
-import org.apache.gluten.planner.rule.GlutenRules
 import org.apache.gluten.ras.{Optimization, RasExplain}
 import org.apache.gluten.ras.rule.RasRule
 
@@ -31,16 +30,6 @@ object GlutenOptimization {
     override def describeNode(node: SparkPlan): String = node.nodeName
   }
 
-  def apply(): Optimization[SparkPlan] = {
-    Optimization[SparkPlan](
-      GlutenPlanModel(),
-      GlutenCostModel(),
-      GlutenMetadataModel(),
-      GlutenPropertyModel(),
-      GlutenExplain,
-      RasRule.Factory.reuse(GlutenRules()))
-  }
-
   def apply(rules: Seq[RasRule[SparkPlan]]): Optimization[SparkPlan] = {
     Optimization[SparkPlan](
       GlutenPlanModel(),
diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
index a106e84a5..7308703e7 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
@@ -18,8 +18,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.gluten.execution.{ProjectExecTransformer, 
SortExecTransformer, TransformSupport, WholeStageTransformer}
 import org.apache.gluten.execution.datasource.GlutenFormatWriterInjects
-import org.apache.gluten.extension.ColumnarOverrideRules
-import org.apache.gluten.extension.columnar.AddTransformHintRule
+import org.apache.gluten.extension.columnar.{AddTransformHintRule, 
RewriteSparkPlanRulesManager}
 import 
org.apache.gluten.extension.columnar.MiscColumnarRules.TransformPreOverrides
 
 import org.apache.spark.rdd.RDD
@@ -46,7 +45,7 @@ trait GlutenFormatWriterInjectsBase extends 
GlutenFormatWriterInjects {
     }
 
     val rules = List(
-      ColumnarOverrideRules.rewriteSparkPlanRule(),
+      RewriteSparkPlanRulesManager(),
       AddTransformHintRule(),
       TransformPreOverrides()
     )
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 5403fe45f..e2a99ef7e 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -17,8 +17,9 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{ColumnarOverrideRules, GlutenPlan}
+import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.InsertTransitions
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
@@ -31,7 +32,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Fall back the whole query if one unsupported") {
     withSQLConf(("spark.gluten.sql.columnar.query.fallback.threshold", "1")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark).withTransformRules(
+      val rule = new HeuristicApplier(spark).withTransformRules(
         List(
           _ =>
             _ => {
@@ -47,7 +48,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Fall back the whole plan if meeting the configured threshold") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"1")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -65,7 +66,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Don't fall back the whole plan if NOT meeting the configured 
threshold") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"4")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -85,7 +86,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
       " transformable)") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"2")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -105,7 +106,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
       "leaf node is transformable)") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"3")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 2158c2613..31517a141 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,8 +18,9 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{ColumnarOverrideRules, GlutenPlan}
+import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
@@ -31,7 +32,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Fall back the whole query if one unsupported") {
     withSQLConf(("spark.gluten.sql.columnar.query.fallback.threshold", "1")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark).withTransformRules(
+      val rule = new HeuristicApplier(spark).withTransformRules(
         List(
           _ =>
             _ => {
@@ -47,7 +48,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Fall back the whole plan if meeting the configured threshold") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"1")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -65,7 +66,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Don't fall back the whole plan if NOT meeting the configured 
threshold") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"4")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -85,7 +86,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
       " transformable)") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"2")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -105,7 +106,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
       "leaf node is transformable)") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"3")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
index 451738759..079620bf8 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/FallbackStrategiesSuite.scala
@@ -18,8 +18,9 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.execution.BasicScanExecTransformer
-import org.apache.gluten.extension.{ColumnarOverrideRules, GlutenPlan}
+import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.extension.columnar.{FallbackEmptySchemaRelation, 
InsertTransitions, TRANSFORM_UNSUPPORTED, TransformHints}
+import org.apache.gluten.extension.columnar.heuristic.HeuristicApplier
 import org.apache.gluten.utils.QueryPlanSelector
 
 import org.apache.spark.rdd.RDD
@@ -32,7 +33,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Fall back the whole query if one unsupported") {
     withSQLConf(("spark.gluten.sql.columnar.query.fallback.threshold", "1")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark).withTransformRules(
+      val rule = new HeuristicApplier(spark).withTransformRules(
         List(
           _ =>
             _ => {
@@ -48,7 +49,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Fall back the whole plan if meeting the configured threshold") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"1")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -66,7 +67,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
   testGluten("Don't fall back the whole plan if NOT meeting the configured 
threshold") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"4")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -86,7 +87,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
       " transformable)") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"2")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(
@@ -106,7 +107,7 @@ class FallbackStrategiesSuite extends GlutenSQLTestsTrait {
       "leaf node is transformable)") {
     withSQLConf(("spark.gluten.sql.columnar.wholeStage.fallback.threshold", 
"3")) {
       val originalPlan = UnaryOp2(UnaryOp1(UnaryOp2(UnaryOp1(LeafOp()))))
-      val rule = ColumnarOverrideRules(spark)
+      val rule = new HeuristicApplier(spark)
         .enableAdaptiveContext()
         .withTransformRules(
           List(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to