This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 27b9976d54 [GLUTEN-7604][CORE] Code refactors against
ColumnarRuleApplier.Executor (#7606)
27b9976d54 is described below
commit 27b9976d54c128dba01fa4dfe53ab00000cc9aa9
Author: Jiaan Geng <[email protected]>
AuthorDate: Tue Oct 22 08:11:41 2024 +0800
[GLUTEN-7604][CORE] Code refactors against ColumnarRuleApplier.Executor
(#7606)
Closes #7604
Co-authored-by: Hongze Zhang <[email protected]>
---
.../extension/columnar/ColumnarRuleApplier.scala | 38 +-----------------
...uleApplier.scala => ColumnarRuleExecutor.scala} | 45 +++++++---------------
.../columnar/enumerated/EnumeratedApplier.scala | 6 +--
.../columnar/heuristic/HeuristicApplier.scala | 18 ++++-----
4 files changed, 24 insertions(+), 83 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index d9ea4b25c0..d275c58564 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -18,13 +18,9 @@ package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
import org.apache.gluten.extension.util.AdaptiveContext
-import org.apache.gluten.logging.LogLevelUtil
-import org.apache.gluten.metrics.GlutenTimeMetric
-import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
-import org.apache.spark.sql.catalyst.util.sideBySide
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
trait ColumnarRuleApplier {
@@ -43,38 +39,6 @@ object ColumnarRuleApplier {
}
}
- class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends
RuleExecutor[SparkPlan] {
- private val batch: Batch =
- Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new
LoggedRule(r)): _*)
-
- // TODO Remove this exclusion then pass Spark's idempotence check.
- override protected val excludedOnceBatches: Set[String] = Set(batch.name)
-
- override protected def batches: Seq[Batch] = List(batch)
- }
-
- private class LoggedRule(delegate: Rule[SparkPlan])
- extends Rule[SparkPlan]
- with Logging
- with LogLevelUtil {
-
- override val ruleName: String = delegate.ruleName
-
- private def message(oldPlan: SparkPlan, newPlan: SparkPlan, millisTime:
Long): String =
- if (!oldPlan.fastEquals(newPlan)) {
- s"""
- |=== Applying Rule $ruleName took $millisTime ms ===
- |${sideBySide(oldPlan.treeString,
newPlan.treeString).mkString("\n")}
- """.stripMargin
- } else { s"Rule $ruleName has no effect, took $millisTime ms." }
-
- override def apply(plan: SparkPlan): SparkPlan = {
- val (out, millisTime) =
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
- logOnLevel(GlutenConfig.getConf.transformPlanLogLevel, message(plan,
out, millisTime))
- out
- }
- }
-
// A temporary workaround for applying toggle `spark.gluten.enabled`, to be
removed.
trait SkipCondition {
// True if the rule execution should be skipped.
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
similarity index 61%
copy from
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
copy to
gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
index d9ea4b25c0..eea698e078 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleExecutor.scala
@@ -17,42 +17,27 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.GlutenConfig
-import org.apache.gluten.extension.util.AdaptiveContext
import org.apache.gluten.logging.LogLevelUtil
import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.SparkPlan
-trait ColumnarRuleApplier {
- def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
-}
-
-object ColumnarRuleApplier {
- type ColumnarRuleBuilder = ColumnarRuleCall => Rule[SparkPlan]
-
- class ColumnarRuleCall(
- val session: SparkSession,
- val ac: AdaptiveContext,
- val outputsColumnar: Boolean) {
- val conf: GlutenConfig = {
- new GlutenConfig(session.sessionState.conf)
- }
- }
+class ColumnarRuleExecutor(phase: String, rules: Seq[Rule[SparkPlan]])
+ extends RuleExecutor[SparkPlan] {
+ import ColumnarRuleExecutor._
+ private val batch: Batch =
+ Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new
LoggedRule(r)): _*)
- class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends
RuleExecutor[SparkPlan] {
- private val batch: Batch =
- Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new
LoggedRule(r)): _*)
+ // TODO: Remove this exclusion then manage to pass Spark's idempotence check.
+ override protected val excludedOnceBatches: Set[String] = Set(batch.name)
- // TODO Remove this exclusion then pass Spark's idempotence check.
- override protected val excludedOnceBatches: Set[String] = Set(batch.name)
-
- override protected def batches: Seq[Batch] = List(batch)
- }
+ override protected def batches: Seq[Batch] = Seq(batch)
+}
+object ColumnarRuleExecutor {
private class LoggedRule(delegate: Rule[SparkPlan])
extends Rule[SparkPlan]
with Logging
@@ -66,7 +51,9 @@ object ColumnarRuleApplier {
|=== Applying Rule $ruleName took $millisTime ms ===
|${sideBySide(oldPlan.treeString,
newPlan.treeString).mkString("\n")}
""".stripMargin
- } else { s"Rule $ruleName has no effect, took $millisTime ms." }
+ } else {
+ s"Rule $ruleName has no effect, took $millisTime ms."
+ }
override def apply(plan: SparkPlan): SparkPlan = {
val (out, millisTime) =
GlutenTimeMetric.recordMillisTime(delegate.apply(plan))
@@ -74,10 +61,4 @@ object ColumnarRuleApplier {
out
}
}
-
- // A temporary workaround for applying toggle `spark.gluten.enabled`, to be
removed.
- trait SkipCondition {
- // True if the rule execution should be skipped.
- def skip(session: SparkSession, plan: SparkPlan): Boolean
- }
}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index de6ac2a7d9..c4d53653c0 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -56,10 +56,8 @@ class EnumeratedApplier(
finalPlan
}
- private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan
= {
- val executor = new ColumnarRuleApplier.Executor("ras", rules)
- executor.execute(plan)
- }
+ private def apply0(rules: Seq[Rule[SparkPlan]], plan: SparkPlan): SparkPlan =
+ new ColumnarRuleExecutor("ras", rules).execute(plan)
private def maybeAqe[T](f: => T): T = {
adaptiveContext.setAdaptiveContext()
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index 8e612c6aed..a039e9a562 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -52,10 +52,10 @@ class HeuristicApplier(
private def makeRule(call: ColumnarRuleCall): Rule[SparkPlan] = {
plan =>
- val finalPlan = prepareFallback(plan) {
+ prepareFallback(plan) {
p =>
val suggestedPlan = transformPlan("transform", transformRules(call),
p)
- transformPlan("fallback", fallbackPolicies(call), suggestedPlan)
match {
+ val finalPlan = transformPlan("fallback", fallbackPolicies(call),
suggestedPlan) match {
case FallbackNode(fallbackPlan) =>
// we should use vanilla c2r rather than native c2r,
// and there should be no `GlutenPlan` any more,
@@ -64,23 +64,21 @@ class HeuristicApplier(
case plan =>
transformPlan("post", postRules(call), plan)
}
+ transformPlan("final", finalRules(call), finalPlan)
}
- transformPlan("final", finalRules(call), finalPlan)
}
private def transformPlan(
phase: String,
rules: Seq[Rule[SparkPlan]],
- plan: SparkPlan): SparkPlan = {
- val executor = new ColumnarRuleApplier.Executor(phase, rules)
- executor.execute(plan)
- }
+ plan: SparkPlan): SparkPlan =
+ new ColumnarRuleExecutor(phase, rules).execute(plan)
- private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
+ private def prepareFallback[T](p: SparkPlan)(f: SparkPlan => T): T = {
adaptiveContext.setAdaptiveContext()
- adaptiveContext.setOriginalPlan(plan)
+ adaptiveContext.setOriginalPlan(p)
try {
- f(plan)
+ f(p)
} finally {
adaptiveContext.resetOriginalPlan()
adaptiveContext.resetAdaptiveContext()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]