This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 27b9976d54 [GLUTEN-7604][CORE] Code refactors against 
ColumnarRuleApplier.Executor (#7606)
27b9976d54 is described below

commit 27b9976d54c128dba01fa4dfe53ab00000cc9aa9
Author: Jiaan Geng <[email protected]>
AuthorDate: Tue Oct 22 08:11:41 2024 +0800

    [GLUTEN-7604][CORE] Code refactors against ColumnarRuleApplier.Executor 
(#7606)
    
    Closes #7604
    
    Co-authored-by: Hongze Zhang <[email protected]>
---
 .../extension/columnar/ColumnarRuleApplier.scala   | 38 +-----------------
 ...uleApplier.scala => ColumnarRuleExecutor.scala} | 45 +++++++---------------
 .../columnar/enumerated/EnumeratedApplier.scala    |  6 +--
 .../columnar/heuristic/HeuristicApplier.scala      | 18 ++++-----
 4 files changed, 24 insertions(+), 83 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index d9ea4b25c0..d275c58564 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -18,13 +18,9 @@ package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.extension.util.AdaptiveContext
-import org.apache.gluten.logging.LogLevelUtil
-import org.apache.gluten.metrics.GlutenTimeMetric
 
-import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.catalyst.util.sideBySide
+import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.SparkPlan
 
 trait ColumnarRuleApplier {
@@ -43,38 +39,6 @@ object ColumnarRuleApplier {
     }
   }
 
-  class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends 
RuleExecutor[SparkPlan] {
-    private val batch: Batch =
-      Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new 
LoggedRule(r)): _*)
-
-    // TODO Remove this exclusion then pass Spark's idempotence check.
-    override protected val excludedOnceBatches: Set[String] = Set(batch.name)
-
-    override protected def batches: Seq[Batch] = List(batch)
-  }
-
-  private class LoggedRule(delegate: Rule[SparkPlan])
-    extends Rule[SparkPlan]
-    with Logging
-    with LogLevelUtil {
-
-    override val ruleName: String = delegate.ruleName
-
-    private def message(oldPlan: SparkPlan, newPlan: SparkPlan, millisTime: 
Long): String =
-      if (!oldPlan.fastEquals(newPlan)) {
-        s"""
-           |=== Applying Rule $ruleName took $millisTime ms ===
-           |${sideBySide(oldPlan.treeString, 
newPlan.treeString).mkString("\n")}
-           """.stripMargin
-      } else { s"Rule $ruleName has no effect, took $millisTime ms." }
-
-    override def apply(plan: SparkPlan): SparkPlan = {
-      val (out, millisTime) = 
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
-      logOnLevel(GlutenConfig.getConf.transformPlanLogLevel, message(plan, 
out, millisTime))
-      out
-    }
-  }
-
   // A temporary workaround for applying toggle `spark.gluten.enabled`, to be 
removed.
   trait SkipCondition {
     // True if the rule execution should be skipped.
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
similarity index 61%
copy from 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
copy to 
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
index d9ea4b25c0..eea698e078 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
@@ -17,42 +17,27 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.util.AdaptiveContext
 import org.apache.gluten.logging.LogLevelUtil
 import org.apache.gluten.metrics.GlutenTimeMetric
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.execution.SparkPlan
 
-trait ColumnarRuleApplier {
-  def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
-}
-
-object ColumnarRuleApplier {
-  type ColumnarRuleBuilder = ColumnarRuleCall => Rule[SparkPlan]
-
-  class ColumnarRuleCall(
-      val session: SparkSession,
-      val ac: AdaptiveContext,
-      val outputsColumnar: Boolean) {
-    val conf: GlutenConfig = {
-      new GlutenConfig(session.sessionState.conf)
-    }
-  }
+class ColumnarRuleExecutor(phase: String, rules: Seq[Rule[SparkPlan]])
+  extends RuleExecutor[SparkPlan] {
+  import ColumnarRuleExecutor._
+  private val batch: Batch =
+    Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new 
LoggedRule(r)): _*)
 
-  class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends 
RuleExecutor[SparkPlan] {
-    private val batch: Batch =
-      Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new 
LoggedRule(r)): _*)
+  // TODO: Remove this exclusion then manage to pass Spark's idempotence check.
+  override protected val excludedOnceBatches: Set[String] = Set(batch.name)
 
-    // TODO Remove this exclusion then pass Spark's idempotence check.
-    override protected val excludedOnceBatches: Set[String] = Set(batch.name)
-
-    override protected def batches: Seq[Batch] = List(batch)
-  }
+  override protected def batches: Seq[Batch] = Seq(batch)
+}
 
+object ColumnarRuleExecutor {
   private class LoggedRule(delegate: Rule[SparkPlan])
     extends Rule[SparkPlan]
     with Logging
@@ -66,7 +51,9 @@ object ColumnarRuleApplier {
            |=== Applying Rule $ruleName took $millisTime ms ===
            |${sideBySide(oldPlan.treeString, 
newPlan.treeString).mkString("\n")}
            """.stripMargin
-      } else { s"Rule $ruleName has no effect, took $millisTime ms." }
+      } else {
+        s"Rule $ruleName has no effect, took $millisTime ms."
+      }
 
     override def apply(plan: SparkPlan): SparkPlan = {
       val (out, millisTime) = 
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
@@ -74,10 +61,4 @@ object ColumnarRuleApplier {
       out
     }
   }
-
-  // A temporary workaround for applying toggle `spark.gluten.enabled`, to be 
removed.
-  trait SkipCondition {
-    // True if the rule execution should be skipped.
-    def skip(session: SparkSession, plan: SparkPlan): Boolean
-  }
 }
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index de6ac2a7d9..c4d53653c0 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -56,10 +56,8 @@ class EnumeratedApplier(
     finalPlan
   }
 
-  private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan 
= {
-    val executor = new ColumnarRuleApplier.Executor("ras", rules)
-    executor.execute(plan)
-  }
+  private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan =
+    new ColumnarRuleExecutor("ras", rules).execute(plan)
 
   private def maybeAqe[T](f: => T): T = {
     adaptiveContext.setAdaptiveContext()
diff --git 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 8e612c6aed..a039e9a562 100644
--- 
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -52,10 +52,10 @@ class HeuristicApplier(
 
   private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] = {
     plan =>
-      val finalPlan = prepareFallback(plan) {
+      prepareFallback(plan) {
         p =>
           val suggestedPlan = transformPlan("transform", transformRules(call), 
p)
-          transformPlan("fallback", fallbackPolicies(call), suggestedPlan) 
match {
+          val finalPlan = transformPlan("fallback", fallbackPolicies(call), 
suggestedPlan) match {
             case FallbackNode(fallbackPlan) =>
               // we should use vanilla c2r rather than native c2r,
               // and there should be no `GlutenPlan` any more,
@@ -64,23 +64,21 @@ class HeuristicApplier(
             case plan =>
               transformPlan("post", postRules(call), plan)
           }
+          transformPlan("final", finalRules(call), finalPlan)
       }
-      transformPlan("final", finalRules(call), finalPlan)
   }
 
   private def transformPlan(
       phase: String,
       rules: Seq[Rule[SparkPlan]],
-      plan: SparkPlan): SparkPlan = {
-    val executor = new ColumnarRuleApplier.Executor(phase, rules)
-    executor.execute(plan)
-  }
+      plan: SparkPlan): SparkPlan =
+    new ColumnarRuleExecutor(phase, rules).execute(plan)
 
-  private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
+  private def prepareFallback[T](p: SparkPlan)(f: SparkPlan => T): T = {
     adaptiveContext.setAdaptiveContext()
-    adaptiveContext.setOriginalPlan(plan)
+    adaptiveContext.setOriginalPlan(p)
     try {
-      f(plan)
+      f(p)
     } finally {
       adaptiveContext.resetOriginalPlan()
       adaptiveContext.resetAdaptiveContext()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to