This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 29390e1e4 [GLUTEN-6748][CORE] Search stack trace to infer adaptive 
execution context (#7121)
29390e1e4 is described below

commit 29390e1e43117275a8bdfe58a12b2670a4e556d0
Author: PHILO-HE <[email protected]>
AuthorDate: Thu Sep 5 13:18:17 2024 +0800

    [GLUTEN-6748][CORE] Search stack trace to infer adaptive execution context 
(#7121)
    
    Closes #6748
---
 .../columnar/enumerated/EnumeratedApplier.scala         |  9 +--------
 .../extension/columnar/heuristic/HeuristicApplier.scala |  9 +--------
 .../extension/columnar/util/AdaptiveContext.scala       | 17 +++++------------
 3 files changed, 7 insertions(+), 28 deletions(-)

diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
index bebce3a61..6ce4e24ed 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/EnumeratedApplier.scala
@@ -40,14 +40,7 @@ class EnumeratedApplier(session: SparkSession, ruleBuilders: 
Seq[ColumnarRuleBui
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
-  // An empirical value.
-  private val aqeStackTraceIndex =
-    if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
-      16
-    } else {
-      14
-    }
-  private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
+  private val adaptiveContext = AdaptiveContext(session)
 
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
     val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
index dea9f01df..85f44878f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/heuristic/HeuristicApplier.scala
@@ -39,14 +39,7 @@ class HeuristicApplier(
   extends ColumnarRuleApplier
   with Logging
   with LogLevelUtil {
-  // This is an empirical value, may need to be changed for supporting other 
versions of spark.
-  private val aqeStackTraceIndex =
-    if (scala.util.Properties.releaseVersion.exists(_.startsWith("2.12"))) {
-      19
-    } else {
-      17
-    }
-  private val adaptiveContext = AdaptiveContext(session, aqeStackTraceIndex)
+  private val adaptiveContext = AdaptiveContext(session)
 
   override def apply(plan: SparkPlan, outputsColumnar: Boolean): SparkPlan = {
     val call = new ColumnarRuleCall(session, adaptiveContext, outputsColumnar)
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
index e1f594fd3..de72bc4bc 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/util/AdaptiveContext.scala
@@ -34,8 +34,8 @@ sealed trait AdaptiveContext {
 }
 
 object AdaptiveContext {
-  def apply(session: SparkSession, aqeStackTraceIndex: Int): AdaptiveContext =
-    new AdaptiveContextImpl(session, aqeStackTraceIndex)
+  def apply(session: SparkSession): AdaptiveContext =
+    new AdaptiveContextImpl(session)
 
   private val GLUTEN_IS_ADAPTIVE_CONTEXT = "gluten.isAdaptiveContext"
 
@@ -45,8 +45,7 @@ object AdaptiveContext {
   private val localIsAdaptiveContextFlags: ThreadLocal[ListBuffer[Boolean]] =
     ThreadLocal.withInitial(() => ListBuffer.empty[Boolean])
 
-  private class AdaptiveContextImpl(session: SparkSession, aqeStackTraceIndex: 
Int)
-    extends AdaptiveContext {
+  private class AdaptiveContextImpl(session: SparkSession) extends 
AdaptiveContext {
     // Just for test use.
     override def enableAdaptiveContext(): Unit = {
       session.sparkContext.setLocalProperty(GLUTEN_IS_ADAPTIVE_CONTEXT, "true")
@@ -60,19 +59,13 @@ object AdaptiveContext {
 
     override def setAdaptiveContext(): Unit = {
       val traceElements = Thread.currentThread.getStackTrace
-      assert(
-        traceElements.length > aqeStackTraceIndex,
-        s"The number of stack trace elements is expected to be more than 
$aqeStackTraceIndex")
       // ApplyColumnarRulesAndInsertTransitions is called by either 
QueryExecution or
       // AdaptiveSparkPlanExec. So by checking the stack trace, we can know 
whether
-      // columnar rule will be applied in adaptive execution context. This 
part of code
-      // needs to be carefully checked when supporting higher versions of 
spark to make
-      // sure the calling stack has not been changed.
+      // columnar rule will be applied in adaptive execution context.
       localIsAdaptiveContextFlags
         .get()
         .prepend(
-          traceElements(aqeStackTraceIndex).getClassName
-            .equals(AdaptiveSparkPlanExec.getClass.getName))
+          
traceElements.exists(_.getClassName.equals(AdaptiveSparkPlanExec.getClass.getName)))
     }
 
     override def resetAdaptiveContext(): Unit =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to