This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 29390e1e4 [GLUTEN-6748][CORE] Search stack trace to infer adaptive
execution context (#7121)
29390e1e4 is described below
commit 29390e1e43117275a8bdfe58a12b2670a4e556d0
Author: PHILO-HE <[email protected]>
AuthorDate: Thu Sep 5 13:18:17 2024 +0800
[GLUTEN-6748][CORE] Search stack trace to infer adaptive execution context
(#7121)
Closes #6748
---
.../columnar/enumerated/EnumeratedApplier.scala | 9 +--------
.../extension/columnar/heuristic/HeuristicApplier.scala | 9 +--------
.../extension/columnar/util/AdaptiveContext.scala | 17 +++++------------
3 files changed, 7 insertions(+), 28 deletions(-)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index bebce3a61..6ce4e24ed 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -40,14 +40,7 @@ class EnumeratedApplier(session: SparkSession, ruleBuilders:
Seq[ColumnarRuleBui
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
- // An empirical value.
- private val aqeStackTraceIndex =
- if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
- 16
- } else {
- 14
- }
- private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
+ private val adaptiveContext = AdaptiveContext(session)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index dea9f01df..85f44878f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -39,14 +39,7 @@ class HeuristicApplier(
extends ColumnarRuleApplier
with Logging
with LogLevelUtil {
- // This is an empirical value, may need to be changed for supporting other
versions of spark.
- private val aqeStackTraceIndex =
- if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
- 19
- } else {
- 17
- }
- private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
+ private val adaptiveContext = AdaptiveContext(session)
override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
index e1f594fd3..de72bc4bc 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
@@ -34,8 +34,8 @@ sealed trait AdaptiveContext {
}
object AdaptiveContext {
- def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext =
- new AdaptiveContextImpl(session, aqeStackTraceIndex)
+ def apply(session: SparkSession): AdaptiveContext =
+ new AdaptiveContextImpl(session)
private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"
@@ -45,8 +45,7 @@ object AdaptiveContext {
private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])
- private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex:
Int)
- extends AdaptiveContext {
+ private class AdaptiveContextImpl(session: SparkSession) extends
AdaptiveContext {
// Just for test use.
override def enableAdaptiveContext(): Unit = {
session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
@@ -60,19 +59,13 @@ object AdaptiveContext {
override def setAdaptiveContext(): Unit = {
val traceElements = Thread.currentThread.getStackTrace
- assert(
- traceElements.length > aqeStackTraceIndex,
- s"The number of stack trace elements is expected to be more than
$aqeStackTraceIndex")
// ApplyColumnarRulesAndInsertTransitions is called by either
QueryExecution or
// AdaptiveSparkPlanExec. So by checking the stack trace, we can know
whether
- // columnar rule will be applied in adaptive execution context. This
part of code
- // needs to be carefully checked when supporting higher versions of
spark to make
- // sure the calling stack has not been changed.
+ // columnar rule will be applied in adaptive execution context.
localIsAdaptiveContextFlags
.get()
.prepend(
- traceElements(aqeStackTraceIndex).getClassName
- .equals(AdaptiveSparkPlanExec.getClass.getName))
+
traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName)))
}
override def resetAdaptiveContext(): Unit =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]