This is an automated email from the ASF dual-hosted git repository.

ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 413473bb4 [CORE] Make collectQueryExecutionFallbackSummary as a public 
util method (#6679)
413473bb4 is described below

commit 413473bb4727bc0b99d20b61f4bdc0fa2ea66fd2
Author: Zhen Wang <[email protected]>
AuthorDate: Fri Aug 2 09:05:05 2024 +0800

    [CORE] Make collectQueryExecutionFallbackSummary as a public util method 
(#6679)
---
 .../spark/sql/execution/GlutenImplicits.scala      | 231 +++++++++++----------
 1 file changed, 117 insertions(+), 114 deletions(-)

diff --git 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index eb42f0a88..2e2af6517 100644
--- 
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer
 import org.apache.gluten.extension.GlutenPlan
 import org.apache.gluten.utils.PlanUtil
 
-import org.apache.spark.sql.{AnalysisException, Dataset}
+import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
@@ -87,131 +87,134 @@ object GlutenImplicits {
     }
   }
 
-  implicit class DatasetTransformer[T](dateset: Dataset[T]) {
-    private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
-      val args = p.argString(Int.MaxValue)
-      val index = args.indexOf("isFinalPlan=")
-      assert(index >= 0)
-      args.substring(index + "isFinalPlan=".length).trim.toBoolean
-    }
+  private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+    val args = p.argString(Int.MaxValue)
+    val index = args.indexOf("isFinalPlan=")
+    assert(index >= 0)
+    args.substring(index + "isFinalPlan=".length).trim.toBoolean
+  }
 
-    private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
-      var numGlutenNodes = 0
-      val fallbackNodeToReason = new mutable.HashMap[String, String]
-
-      def collect(tmp: QueryPlan[_]): Unit = {
-        tmp.foreachUp {
-          case _: ExecutedCommandExec =>
-          case _: CommandResultExec =>
-          case _: V2CommandExec =>
-          case _: DataWritingCommandExec =>
-          case _: WholeStageCodegenExec =>
-          case _: WholeStageTransformer =>
-          case _: InputAdapter =>
-          case _: ColumnarInputAdapter =>
-          case _: InputIteratorTransformer =>
-          case _: ColumnarToRowTransition =>
-          case _: RowToColumnarTransition =>
-          case p: ReusedExchangeExec =>
-          case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
-            collect(p.executedPlan)
-          case p: AdaptiveSparkPlanExec =>
-            // if we are here that means we are inside table cache.
-            val (innerNumGlutenNodes, innerFallbackNodeToReason) =
-              withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-                // re-plan manually to skip cached data
-                val newSparkPlan = QueryExecution.createSparkPlan(
-                  dateset.sparkSession,
-                  dateset.sparkSession.sessionState.planner,
-                  p.inputPlan.logicalLink.get)
-                val newExecutedPlan = QueryExecution.prepareExecutedPlan(
-                  dateset.sparkSession,
-                  newSparkPlan
-                )
-                processPlan(
-                  newExecutedPlan,
-                  new PlanStringConcat().append,
-                  Some(plan => collectFallbackNodes(plan)))
-              }
-            numGlutenNodes += innerNumGlutenNodes
-            fallbackNodeToReason.++=(innerFallbackNodeToReason)
-          case p: QueryStageExec => collect(p.plan)
-          case p: GlutenPlan =>
-            numGlutenNodes += 1
-            p.innerChildren.foreach(collect)
-          case i: InMemoryTableScanExec =>
-            if (PlanUtil.isGlutenTableCache(i)) {
-              numGlutenNodes += 1
-            } else {
-              addFallbackNodeWithReason(i, "Columnar table cache is disabled", 
fallbackNodeToReason)
+  private def collectFallbackNodes(spark: SparkSession, plan: QueryPlan[_]): 
FallbackInfo = {
+    var numGlutenNodes = 0
+    val fallbackNodeToReason = new mutable.HashMap[String, String]
+
+    def collect(tmp: QueryPlan[_]): Unit = {
+      tmp.foreachUp {
+        case _: ExecutedCommandExec =>
+        case _: CommandResultExec =>
+        case _: V2CommandExec =>
+        case _: DataWritingCommandExec =>
+        case _: WholeStageCodegenExec =>
+        case _: WholeStageTransformer =>
+        case _: InputAdapter =>
+        case _: ColumnarInputAdapter =>
+        case _: InputIteratorTransformer =>
+        case _: ColumnarToRowTransition =>
+        case _: RowToColumnarTransition =>
+        case p: ReusedExchangeExec =>
+        case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
+          collect(p.executedPlan)
+        case p: AdaptiveSparkPlanExec =>
+          // if we are here that means we are inside table cache.
+          val (innerNumGlutenNodes, innerFallbackNodeToReason) =
+            withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+              // re-plan manually to skip cached data
+              val newSparkPlan = QueryExecution.createSparkPlan(
+                spark,
+                spark.sessionState.planner,
+                p.inputPlan.logicalLink.get)
+              val newExecutedPlan = QueryExecution.prepareExecutedPlan(
+                spark,
+                newSparkPlan
+              )
+              processPlan(
+                newExecutedPlan,
+                new PlanStringConcat().append,
+                Some(plan => collectFallbackNodes(spark, plan)))
             }
-            collect(i.relation.cachedPlan)
-          case _: AQEShuffleReadExec => // Ignore
-          case p: SparkPlan =>
-            handleVanillaSparkPlan(p, fallbackNodeToReason)
-            p.innerChildren.foreach(collect)
-          case _ =>
-        }
+          numGlutenNodes += innerNumGlutenNodes
+          fallbackNodeToReason.++=(innerFallbackNodeToReason)
+        case p: QueryStageExec => collect(p.plan)
+        case p: GlutenPlan =>
+          numGlutenNodes += 1
+          p.innerChildren.foreach(collect)
+        case i: InMemoryTableScanExec =>
+          if (PlanUtil.isGlutenTableCache(i)) {
+            numGlutenNodes += 1
+          } else {
+            addFallbackNodeWithReason(i, "Columnar table cache is disabled", 
fallbackNodeToReason)
+          }
+          collect(i.relation.cachedPlan)
+        case _: AQEShuffleReadExec => // Ignore
+        case p: SparkPlan =>
+          handleVanillaSparkPlan(p, fallbackNodeToReason)
+          p.innerChildren.foreach(collect)
+        case _ =>
       }
-
-      collect(plan)
-      (numGlutenNodes, fallbackNodeToReason.toMap)
     }
 
-    private def collectQueryExecutionFallbackSummary(qe: QueryExecution): 
FallbackSummary = {
-      var totalNumGlutenNodes = 0
-      var totalNumFallbackNodes = 0
-      val totalPhysicalPlanDescription = new ArrayBuffer[String]()
-      val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()
-
-      def handlePlanWithAQEAndTableCache(
-          plan: SparkPlan,
-          logicalPlan: LogicalPlan,
-          isMaterialized: Boolean): Unit = {
-        val concat = new PlanStringConcat()
-        val collectFallbackFunc = Some(plan => collectFallbackNodes(plan))
-        val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
-          withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-            // AQE is not materialized, so the columnar rules are not applied.
-            // For this case, We apply columnar rules manually with disable 
AQE.
-            val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan)
-            processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
-          }
-        } else {
-          processPlan(plan, concat.append, collectFallbackFunc)
-        }
-        totalNumGlutenNodes += numGlutenNodes
-        totalNumFallbackNodes += fallbackNodeToReason.size
-        totalPhysicalPlanDescription.append(concat.toString())
-        totalFallbackNodeToReason.append(fallbackNodeToReason)
-      }
+    collect(plan)
+    (numGlutenNodes, fallbackNodeToReason.toMap)
+  }
 
-      // For command-like query, e.g., `INSERT INTO TABLE ...`
-      qe.commandExecuted.foreach {
-        case r: CommandResult =>
-          handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, 
r.commandLogicalPlan, true)
-        case _ => // ignore
+  // collect fallback sumaary from query execution, make this method public as 
a util method
+  def collectQueryExecutionFallbackSummary(
+      spark: SparkSession,
+      qe: QueryExecution): FallbackSummary = {
+    var totalNumGlutenNodes = 0
+    var totalNumFallbackNodes = 0
+    val totalPhysicalPlanDescription = new ArrayBuffer[String]()
+    val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()
+
+    def handlePlanWithAQEAndTableCache(
+        plan: SparkPlan,
+        logicalPlan: LogicalPlan,
+        isMaterialized: Boolean): Unit = {
+      val concat = new PlanStringConcat()
+      val collectFallbackFunc = Some(plan => collectFallbackNodes(spark, plan))
+      val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
+        withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+          // AQE is not materialized, so the columnar rules are not applied.
+          // For this case, We apply columnar rules manually with disable AQE.
+          val qe = spark.sessionState.executePlan(logicalPlan)
+          processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
+        }
+      } else {
+        processPlan(plan, concat.append, collectFallbackFunc)
       }
+      totalNumGlutenNodes += numGlutenNodes
+      totalNumFallbackNodes += fallbackNodeToReason.size
+      totalPhysicalPlanDescription.append(concat.toString())
+      totalFallbackNodeToReason.append(fallbackNodeToReason)
+    }
 
-      // For query, e.g., `SELECT * FROM ...`
-      if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
-        val isMaterialized = qe.executedPlan.find {
-          case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
-          case _ => false
-        }.isDefined
-        handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, 
isMaterialized)
-      }
+    // For command-like query, e.g., `INSERT INTO TABLE ...`
+    qe.commandExecuted.foreach {
+      case r: CommandResult =>
+        handlePlanWithAQEAndTableCache(r.commandPhysicalPlan, 
r.commandLogicalPlan, true)
+      case _ => // ignore
+    }
 
-      FallbackSummary(
-        totalNumGlutenNodes,
-        totalNumFallbackNodes,
-        totalPhysicalPlanDescription.toSeq,
-        totalFallbackNodeToReason.toSeq
-      )
+    // For query, e.g., `SELECT * FROM ...`
+    if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
+      val isMaterialized = qe.executedPlan.find {
+        case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
+        case _ => false
+      }.isDefined
+      handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed, 
isMaterialized)
     }
 
+    FallbackSummary(
+      totalNumGlutenNodes,
+      totalNumFallbackNodes,
+      totalPhysicalPlanDescription.toSeq,
+      totalFallbackNodeToReason.toSeq
+    )
+  }
+
+  implicit class DatasetTransformer[T](dateset: Dataset[T]) {
     def fallbackSummary(): FallbackSummary = {
-      collectQueryExecutionFallbackSummary(dateset.queryExecution)
+      collectQueryExecutionFallbackSummary(dateset.sparkSession, 
dateset.queryExecution)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to