This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 1cafa2826 [VL] Minor refactors on ColumnarRuleApplier (#6086)
1cafa2826 is described below
commit 1cafa2826e428479c0ccca94d7f16470ff5eea8a
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jun 14 16:45:31 2024 +0800
[VL] Minor refactors on ColumnarRuleApplier (#6086)
---
.../extension/columnar/ColumnarRuleApplier.scala | 36 +++++++++++++++
.../columnar/enumerated/EnumeratedApplier.scala | 44 ++++++------------
.../columnar/heuristic/HeuristicApplier.scala | 52 ++++++++--------------
.../org/apache/spark/sql/SparkQueryRunner.scala | 39 +++++++++-------
4 files changed, 91 insertions(+), 80 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
index 17bf01730..ee5bcd883 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala
@@ -16,8 +16,44 @@
*/
package org.apache.gluten.extension.columnar
+import org.apache.gluten.GlutenConfig
+import org.apache.gluten.metrics.GlutenTimeMetric
+import org.apache.gluten.utils.LogLevelUtil
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.execution.SparkPlan
trait ColumnarRuleApplier {
def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan
}
+
+object ColumnarRuleApplier {
+ class Executor(phase: String, rules: Seq[Rule[SparkPlan]]) extends
RuleExecutor[SparkPlan] {
+ private val batch: Batch =
+ Batch(s"Columnar (Phase [$phase])", Once, rules.map(r => new
LoggedRule(r)): _*)
+
+ // TODO Remove this exclusion then pass Spark's idempotence check.
+ override protected val excludedOnceBatches: Set[String] = Set(batch.name)
+
+ override protected def batches: Seq[Batch] = List(batch)
+ }
+
+ private class LoggedRule(delegate: Rule[SparkPlan])
+ extends Rule[SparkPlan]
+ with Logging
+ with LogLevelUtil {
+ // Columnar plan change logging added since
https://github.com/apache/incubator-gluten/pull/456.
+ private val transformPlanLogLevel =
GlutenConfig.getConf.transformPlanLogLevel
+ override val ruleName: String = delegate.ruleName
+
+ override def apply(plan: SparkPlan): SparkPlan =
GlutenTimeMetric.withMillisTime {
+ logOnLevel(
+ transformPlanLogLevel,
+ s"Preparing to apply rule $ruleName on plan:\n${plan.toString}")
+ val out = delegate.apply(plan)
+ logOnLevel(transformPlanLogLevel, s"Plan after applied rule
$ruleName:\n${plan.toString}")
+ out
+ }(t => logOnLevel(transformPlanLogLevel, s"Applying rule $ruleName took $t
ms."))
+ }
+}
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index 26201dc1b..d5260f66a 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -22,13 +22,12 @@ import org.apache.gluten.extension.columnar._
import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTableCacheColumnarToRow,
RemoveTopmostColumnarToRow, RewriteSubqueryBroadcast}
import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
-import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter, SparkPlan}
import org.apache.spark.util.SparkRuleUtil
@@ -47,41 +46,26 @@ class EnumeratedApplier(session: SparkSession)
with LogLevelUtil {
// An empirical value.
private val aqeStackTraceIndex = 16
-
- private lazy val transformPlanLogLevel =
GlutenConfig.getConf.transformPlanLogLevel
- private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
PhysicalPlanSelector.maybe(session, plan) {
- val transformed = transformPlan(transformRules(outputsColumnar), plan,
"transform")
+ val transformed =
+ transformPlan("transform",
transformRules(outputsColumnar).map(_(session)), plan)
val postPlan = maybeAqe {
- transformPlan(postRules(), transformed, "post")
+ transformPlan("post", postRules().map(_(session)), transformed)
}
- val finalPlan = transformPlan(finalRules(), postPlan, "final")
+ val finalPlan = transformPlan("final", finalRules().map(_(session)),
postPlan)
finalPlan
}
private def transformPlan(
- getRules: List[SparkSession => Rule[SparkPlan]],
- plan: SparkPlan,
- step: String) = GlutenTimeMetric.withMillisTime {
- logOnLevel(
- transformPlanLogLevel,
- s"${step}ColumnarTransitions preOverriden plan:\n${plan.toString}")
- val overridden = getRules.foldLeft(plan) {
- (p, getRule) =>
- val rule = getRule(session)
- val newPlan = rule(p)
- planChangeLogger.logRule(rule.ruleName, p, newPlan)
- newPlan
- }
- logOnLevel(
- transformPlanLogLevel,
- s"${step}ColumnarTransitions afterOverriden
plan:\n${overridden.toString}")
- overridden
- }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took:
$t ms."))
+ phase: String,
+ rules: Seq[Rule[SparkPlan]],
+ plan: SparkPlan): SparkPlan = {
+ val executor = new ColumnarRuleApplier.Executor(phase, rules)
+ executor.execute(plan)
+ }
private def maybeAqe[T](f: => T): T = {
adaptiveContext.setAdaptiveContext()
@@ -96,7 +80,7 @@ class EnumeratedApplier(session: SparkSession)
* Rules to let planner create a suggested Gluten plan being sent to
`fallbackPolicies` in which
* the plan will be breakdown and decided to be fallen back or not.
*/
- private def transformRules(outputsColumnar: Boolean): List[SparkSession =>
Rule[SparkPlan]] = {
+ private def transformRules(outputsColumnar: Boolean): Seq[SparkSession =>
Rule[SparkPlan]] = {
List(
(_: SparkSession) => RemoveTransitions,
(spark: SparkSession) => FallbackOnANSIMode(spark),
@@ -126,7 +110,7 @@ class EnumeratedApplier(session: SparkSession)
* Rules applying to non-fallen-back Gluten plans. To do some post cleanup
works on the plan to
* make sure it be able to run and be compatible with Spark's execution
engine.
*/
- private def postRules(): List[SparkSession => Rule[SparkPlan]] =
+ private def postRules(): Seq[SparkSession => Rule[SparkPlan]] =
List(
(s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
@@ -137,7 +121,7 @@ class EnumeratedApplier(session: SparkSession)
* Rules consistently applying to all input plans after all other rules have
been applied, despite
* whether the input plan is fallen back or not.
*/
- private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
+ private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = {
List(
// The rule is required despite whether the stage is fallen back or not.
Since
// ColumnarCachedBatchSerializer is statically registered to Spark
without a columnar rule
diff --git
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index eb5c561bf..ad68786e6 100644
---
a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -23,12 +23,11 @@ import
org.apache.gluten.extension.columnar.MiscColumnarRules.{RemoveGlutenTable
import
org.apache.gluten.extension.columnar.rewrite.RewriteSparkPlanRulesManager
import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.gluten.extension.columnar.util.AdaptiveContext
-import org.apache.gluten.metrics.GlutenTimeMetric
import org.apache.gluten.utils.{LogLevelUtil, PhysicalPlanSelector}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.rules.{PlanChangeLogger, Rule}
+import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
GlutenFallbackReporter, SparkPlan}
import org.apache.spark.util.SparkRuleUtil
@@ -42,54 +41,39 @@ class HeuristicApplier(session: SparkSession)
with LogLevelUtil {
// This is an empirical value, may need to be changed for supporting other
versions of spark.
private val aqeStackTraceIndex = 19
-
- private lazy val transformPlanLogLevel =
GlutenConfig.getConf.transformPlanLogLevel
- private lazy val planChangeLogger = new PlanChangeLogger[SparkPlan]()
-
private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
- override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan =
+ override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
withTransformRules(transformRules(outputsColumnar)).apply(plan)
+ }
// Visible for testing.
- def withTransformRules(transformRules: List[SparkSession =>
Rule[SparkPlan]]): Rule[SparkPlan] =
+ def withTransformRules(transformRules: Seq[SparkSession =>
Rule[SparkPlan]]): Rule[SparkPlan] =
plan =>
PhysicalPlanSelector.maybe(session, plan) {
val finalPlan = prepareFallback(plan) {
p =>
- val suggestedPlan = transformPlan(transformRules, p, "transform")
- transformPlan(fallbackPolicies(), suggestedPlan, "fallback") match
{
+ val suggestedPlan = transformPlan("transform",
transformRules.map(_(session)), p)
+ transformPlan("fallback", fallbackPolicies().map(_(session)),
suggestedPlan) match {
case FallbackNode(fallbackPlan) =>
// we should use vanilla c2r rather than native c2r,
// and there should be no `GlutenPlan` any more,
// so skip the `postRules()`.
fallbackPlan
case plan =>
- transformPlan(postRules(), plan, "post")
+ transformPlan("post", postRules().map(_(session)), plan)
}
}
- transformPlan(finalRules(), finalPlan, "final")
+ transformPlan("final", finalRules().map(_(session)), finalPlan)
}
private def transformPlan(
- getRules: List[SparkSession => Rule[SparkPlan]],
- plan: SparkPlan,
- step: String) = GlutenTimeMetric.withMillisTime {
- logOnLevel(
- transformPlanLogLevel,
- s"${step}ColumnarTransitions preOverridden plan:\n${plan.toString}")
- val overridden = getRules.foldLeft(plan) {
- (p, getRule) =>
- val rule = getRule(session)
- val newPlan = rule(p)
- planChangeLogger.logRule(rule.ruleName, p, newPlan)
- newPlan
- }
- logOnLevel(
- transformPlanLogLevel,
- s"${step}ColumnarTransitions afterOverridden
plan:\n${overridden.toString}")
- overridden
- }(t => logOnLevel(transformPlanLogLevel, s"${step}Transform SparkPlan took:
$t ms."))
+ phase: String,
+ rules: Seq[Rule[SparkPlan]],
+ plan: SparkPlan): SparkPlan = {
+ val executor = new ColumnarRuleApplier.Executor(phase, rules)
+ executor.execute(plan)
+ }
private def prepareFallback[T](plan: SparkPlan)(f: SparkPlan => T): T = {
adaptiveContext.setAdaptiveContext()
@@ -106,7 +90,7 @@ class HeuristicApplier(session: SparkSession)
* Rules to let planner create a suggested Gluten plan being sent to
`fallbackPolicies` in which
* the plan will be breakdown and decided to be fallen back or not.
*/
- private def transformRules(outputsColumnar: Boolean): List[SparkSession =>
Rule[SparkPlan]] = {
+ private def transformRules(outputsColumnar: Boolean): Seq[SparkSession =>
Rule[SparkPlan]] = {
List(
(_: SparkSession) => RemoveTransitions,
(spark: SparkSession) => FallbackOnANSIMode(spark),
@@ -138,7 +122,7 @@ class HeuristicApplier(session: SparkSession)
* Rules to add wrapper `FallbackNode`s on top of the input plan, as hints
to make planner fall
* back the whole input plan to the original vanilla Spark plan.
*/
- private def fallbackPolicies(): List[SparkSession => Rule[SparkPlan]] = {
+ private def fallbackPolicies(): Seq[SparkSession => Rule[SparkPlan]] = {
List(
(_: SparkSession) =>
ExpandFallbackPolicy(adaptiveContext.isAdaptiveContext(),
adaptiveContext.originalPlan()))
@@ -148,7 +132,7 @@ class HeuristicApplier(session: SparkSession)
* Rules applying to non-fallen-back Gluten plans. To do some post cleanup
works on the plan to
* make sure it be able to run and be compatible with Spark's execution
engine.
*/
- private def postRules(): List[SparkSession => Rule[SparkPlan]] =
+ private def postRules(): Seq[SparkSession => Rule[SparkPlan]] =
List(
(s: SparkSession) => RemoveTopmostColumnarToRow(s,
adaptiveContext.isAdaptiveContext())) :::
BackendsApiManager.getSparkPlanExecApiInstance.genExtendedColumnarPostRules()
:::
@@ -159,7 +143,7 @@ class HeuristicApplier(session: SparkSession)
* Rules consistently applying to all input plans after all other rules have
been applied, despite
* whether the input plan is fallen back or not.
*/
- private def finalRules(): List[SparkSession => Rule[SparkPlan]] = {
+ private def finalRules(): Seq[SparkSession => Rule[SparkPlan]] = {
List(
// The rule is required despite whether the stage is fallen back or not.
Since
// ColumnarCachedBatchSerializer is statically registered to Spark
without a columnar rule
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
index bb11a679f..b68f74c1d 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/SparkQueryRunner.scala
@@ -18,13 +18,9 @@ package org.apache.spark.sql
import org.apache.spark.{SparkContext, Success, TaskKilled}
import org.apache.spark.executor.ExecutorMetrics
-import org.apache.spark.scheduler.{
- SparkListener,
- SparkListenerExecutorMetricsUpdate,
- SparkListenerTaskEnd,
- SparkListenerTaskStart
-}
+import org.apache.spark.scheduler.{SparkListener,
SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd,
SparkListenerTaskStart}
import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS
+import org.apache.spark.sql.catalyst.QueryPlanningTracker
import com.google.common.base.Preconditions
import org.apache.commons.lang3.RandomUtils
@@ -50,7 +46,8 @@ object SparkQueryRunner {
"ProcessTreePythonVMemory",
"ProcessTreePythonRSSMemory",
"ProcessTreeOtherVMemory",
- "ProcessTreeOtherRSSMemory")
+ "ProcessTreeOtherRSSMemory"
+ )
def runQuery(
spark: SparkSession,
@@ -82,25 +79,33 @@ object SparkQueryRunner {
println(s"Executing SQL query from resource path $queryPath...")
try {
+ val tracker = new QueryPlanningTracker
val sql = resourceToString(queryPath)
val prev = System.nanoTime()
val df = spark.sql(sql)
- val rows = df.collect()
+ val rows = QueryPlanningTracker.withTracker(tracker) {
+ df.collect()
+ }
if (explain) {
df.explain(extended = true)
}
- val planMillis =
- df.queryExecution.tracker.phases.values.map(p => p.endTimeMs -
p.startTimeMs).sum
+ val sparkTracker = df.queryExecution.tracker
+ val sparkRulesMillis =
+ sparkTracker.rules.map(_._2.totalTimeNs).sum / 1000000L
+ val otherRulesMillis =
+ tracker.rules.map(_._2.totalTimeNs).sum / 1000000L
+ val planMillis = sparkRulesMillis + otherRulesMillis
val totalMillis = (System.nanoTime() - prev) / 1000000L
val collectedMetrics = metrics.map(name => (name,
em.getMetricValue(name))).toMap
RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics)
} finally {
sc.removeSparkListener(metricsListener)
- killTaskListener.foreach(l => {
- sc.removeSparkListener(l)
- println(s"Successful kill rate ${"%.2f%%"
- .format(100 * l.successfulKillRate())} during execution of app:
${sc.applicationId}")
- })
+ killTaskListener.foreach(
+ l => {
+ sc.removeSparkListener(l)
+ println(s"Successful kill rate ${"%.2f%%"
+ .format(100 * l.successfulKillRate())} during execution of app:
${sc.applicationId}")
+ })
sc.setJobDescription(null)
}
}
@@ -166,7 +171,8 @@ class KillTaskListener(val sc: SparkContext) extends
SparkListener {
val total = Math.min(
stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId,
_ => Long.MaxValue),
stageKillWaitTimeLookup
- .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS))
+ .computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS)
+ )
val elapsed = System.currentTimeMillis() - startMs
val remaining = total - elapsed
if (remaining <= 0L) {
@@ -180,6 +186,7 @@ class KillTaskListener(val sc: SparkContext) extends
SparkListener {
}
throw new IllegalStateException()
}
+
val elapsed = wait()
// We have 50% chance to kill the task. FIXME make it configurable?
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]