This is an automated email from the ASF dual-hosted git repository.
ulyssesyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 413473bb4 [CORE] Make collectQueryExecutionFallbackSummary as a public
util method (#6679)
413473bb4 is described below
commit 413473bb4727bc0b99d20b61f4bdc0fa2ea66fd2
Author: Zhen Wang <[email protected]>
AuthorDate: Fri Aug 2 09:05:05 2024 +0800
[CORE] Make collectQueryExecutionFallbackSummary as a public util method
(#6679)
---
.../spark/sql/execution/GlutenImplicits.scala | 231 +++++++++++----------
1 file changed, 117 insertions(+), 114 deletions(-)
diff --git
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index eb42f0a88..2e2af6517 100644
---
a/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++
b/gluten-core/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -20,7 +20,7 @@ import org.apache.gluten.execution.WholeStageTransformer
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.utils.PlanUtil
-import org.apache.spark.sql.{AnalysisException, Dataset}
+import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
@@ -87,131 +87,134 @@ object GlutenImplicits {
}
}
- implicit class DatasetTransformer[T](dateset: Dataset[T]) {
- private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
- val args = p.argString(Int.MaxValue)
- val index = args.indexOf("isFinalPlan=")
- assert(index >= 0)
- args.substring(index + "isFinalPlan=".length).trim.toBoolean
- }
+ private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ val args = p.argString(Int.MaxValue)
+ val index = args.indexOf("isFinalPlan=")
+ assert(index >= 0)
+ args.substring(index + "isFinalPlan=".length).trim.toBoolean
+ }
- private def collectFallbackNodes(plan: QueryPlan[_]): FallbackInfo = {
- var numGlutenNodes = 0
- val fallbackNodeToReason = new mutable.HashMap[String, String]
-
- def collect(tmp: QueryPlan[_]): Unit = {
- tmp.foreachUp {
- case _: ExecutedCommandExec =>
- case _: CommandResultExec =>
- case _: V2CommandExec =>
- case _: DataWritingCommandExec =>
- case _: WholeStageCodegenExec =>
- case _: WholeStageTransformer =>
- case _: InputAdapter =>
- case _: ColumnarInputAdapter =>
- case _: InputIteratorTransformer =>
- case _: ColumnarToRowTransition =>
- case _: RowToColumnarTransition =>
- case p: ReusedExchangeExec =>
- case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
- collect(p.executedPlan)
- case p: AdaptiveSparkPlanExec =>
- // if we are here that means we are inside table cache.
- val (innerNumGlutenNodes, innerFallbackNodeToReason) =
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- // re-plan manually to skip cached data
- val newSparkPlan = QueryExecution.createSparkPlan(
- dateset.sparkSession,
- dateset.sparkSession.sessionState.planner,
- p.inputPlan.logicalLink.get)
- val newExecutedPlan = QueryExecution.prepareExecutedPlan(
- dateset.sparkSession,
- newSparkPlan
- )
- processPlan(
- newExecutedPlan,
- new PlanStringConcat().append,
- Some(plan => collectFallbackNodes(plan)))
- }
- numGlutenNodes += innerNumGlutenNodes
- fallbackNodeToReason.++=(innerFallbackNodeToReason)
- case p: QueryStageExec => collect(p.plan)
- case p: GlutenPlan =>
- numGlutenNodes += 1
- p.innerChildren.foreach(collect)
- case i: InMemoryTableScanExec =>
- if (PlanUtil.isGlutenTableCache(i)) {
- numGlutenNodes += 1
- } else {
- addFallbackNodeWithReason(i, "Columnar table cache is disabled",
fallbackNodeToReason)
+ private def collectFallbackNodes(spark: SparkSession, plan: QueryPlan[_]):
FallbackInfo = {
+ var numGlutenNodes = 0
+ val fallbackNodeToReason = new mutable.HashMap[String, String]
+
+ def collect(tmp: QueryPlan[_]): Unit = {
+ tmp.foreachUp {
+ case _: ExecutedCommandExec =>
+ case _: CommandResultExec =>
+ case _: V2CommandExec =>
+ case _: DataWritingCommandExec =>
+ case _: WholeStageCodegenExec =>
+ case _: WholeStageTransformer =>
+ case _: InputAdapter =>
+ case _: ColumnarInputAdapter =>
+ case _: InputIteratorTransformer =>
+ case _: ColumnarToRowTransition =>
+ case _: RowToColumnarTransition =>
+ case p: ReusedExchangeExec =>
+ case p: AdaptiveSparkPlanExec if isFinalAdaptivePlan(p) =>
+ collect(p.executedPlan)
+ case p: AdaptiveSparkPlanExec =>
+ // if we are here that means we are inside table cache.
+ val (innerNumGlutenNodes, innerFallbackNodeToReason) =
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ // re-plan manually to skip cached data
+ val newSparkPlan = QueryExecution.createSparkPlan(
+ spark,
+ spark.sessionState.planner,
+ p.inputPlan.logicalLink.get)
+ val newExecutedPlan = QueryExecution.prepareExecutedPlan(
+ spark,
+ newSparkPlan
+ )
+ processPlan(
+ newExecutedPlan,
+ new PlanStringConcat().append,
+ Some(plan => collectFallbackNodes(spark, plan)))
}
- collect(i.relation.cachedPlan)
- case _: AQEShuffleReadExec => // Ignore
- case p: SparkPlan =>
- handleVanillaSparkPlan(p, fallbackNodeToReason)
- p.innerChildren.foreach(collect)
- case _ =>
- }
+ numGlutenNodes += innerNumGlutenNodes
+ fallbackNodeToReason.++=(innerFallbackNodeToReason)
+ case p: QueryStageExec => collect(p.plan)
+ case p: GlutenPlan =>
+ numGlutenNodes += 1
+ p.innerChildren.foreach(collect)
+ case i: InMemoryTableScanExec =>
+ if (PlanUtil.isGlutenTableCache(i)) {
+ numGlutenNodes += 1
+ } else {
+ addFallbackNodeWithReason(i, "Columnar table cache is disabled",
fallbackNodeToReason)
+ }
+ collect(i.relation.cachedPlan)
+ case _: AQEShuffleReadExec => // Ignore
+ case p: SparkPlan =>
+ handleVanillaSparkPlan(p, fallbackNodeToReason)
+ p.innerChildren.foreach(collect)
+ case _ =>
}
-
- collect(plan)
- (numGlutenNodes, fallbackNodeToReason.toMap)
}
- private def collectQueryExecutionFallbackSummary(qe: QueryExecution):
FallbackSummary = {
- var totalNumGlutenNodes = 0
- var totalNumFallbackNodes = 0
- val totalPhysicalPlanDescription = new ArrayBuffer[String]()
- val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()
-
- def handlePlanWithAQEAndTableCache(
- plan: SparkPlan,
- logicalPlan: LogicalPlan,
- isMaterialized: Boolean): Unit = {
- val concat = new PlanStringConcat()
- val collectFallbackFunc = Some(plan => collectFallbackNodes(plan))
- val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
- withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
- // AQE is not materialized, so the columnar rules are not applied.
- // For this case, We apply columnar rules manually with disable
AQE.
- val qe = dateset.sparkSession.sessionState.executePlan(logicalPlan)
- processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
- }
- } else {
- processPlan(plan, concat.append, collectFallbackFunc)
- }
- totalNumGlutenNodes += numGlutenNodes
- totalNumFallbackNodes += fallbackNodeToReason.size
- totalPhysicalPlanDescription.append(concat.toString())
- totalFallbackNodeToReason.append(fallbackNodeToReason)
- }
+ collect(plan)
+ (numGlutenNodes, fallbackNodeToReason.toMap)
+ }
- // For command-like query, e.g., `INSERT INTO TABLE ...`
- qe.commandExecuted.foreach {
- case r: CommandResult =>
- handlePlanWithAQEAndTableCache(r.commandPhysicalPlan,
r.commandLogicalPlan, true)
- case _ => // ignore
+ // collect fallback sumaary from query execution, make this method public as
a util method
+ def collectQueryExecutionFallbackSummary(
+ spark: SparkSession,
+ qe: QueryExecution): FallbackSummary = {
+ var totalNumGlutenNodes = 0
+ var totalNumFallbackNodes = 0
+ val totalPhysicalPlanDescription = new ArrayBuffer[String]()
+ val totalFallbackNodeToReason = new ArrayBuffer[Map[String, String]]()
+
+ def handlePlanWithAQEAndTableCache(
+ plan: SparkPlan,
+ logicalPlan: LogicalPlan,
+ isMaterialized: Boolean): Unit = {
+ val concat = new PlanStringConcat()
+ val collectFallbackFunc = Some(plan => collectFallbackNodes(spark, plan))
+ val (numGlutenNodes, fallbackNodeToReason) = if (!isMaterialized) {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ // AQE is not materialized, so the columnar rules are not applied.
+ // For this case, We apply columnar rules manually with disable AQE.
+ val qe = spark.sessionState.executePlan(logicalPlan)
+ processPlan(qe.executedPlan, concat.append, collectFallbackFunc)
+ }
+ } else {
+ processPlan(plan, concat.append, collectFallbackFunc)
}
+ totalNumGlutenNodes += numGlutenNodes
+ totalNumFallbackNodes += fallbackNodeToReason.size
+ totalPhysicalPlanDescription.append(concat.toString())
+ totalFallbackNodeToReason.append(fallbackNodeToReason)
+ }
- // For query, e.g., `SELECT * FROM ...`
- if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
- val isMaterialized = qe.executedPlan.find {
- case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
- case _ => false
- }.isDefined
- handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed,
isMaterialized)
- }
+ // For command-like query, e.g., `INSERT INTO TABLE ...`
+ qe.commandExecuted.foreach {
+ case r: CommandResult =>
+ handlePlanWithAQEAndTableCache(r.commandPhysicalPlan,
r.commandLogicalPlan, true)
+ case _ => // ignore
+ }
- FallbackSummary(
- totalNumGlutenNodes,
- totalNumFallbackNodes,
- totalPhysicalPlanDescription.toSeq,
- totalFallbackNodeToReason.toSeq
- )
+ // For query, e.g., `SELECT * FROM ...`
+ if (qe.executedPlan.find(_.isInstanceOf[CommandResultExec]).isEmpty) {
+ val isMaterialized = qe.executedPlan.find {
+ case a: AdaptiveSparkPlanExec if isFinalAdaptivePlan(a) => true
+ case _ => false
+ }.isDefined
+ handlePlanWithAQEAndTableCache(qe.executedPlan, qe.analyzed,
isMaterialized)
}
+ FallbackSummary(
+ totalNumGlutenNodes,
+ totalNumFallbackNodes,
+ totalPhysicalPlanDescription.toSeq,
+ totalFallbackNodeToReason.toSeq
+ )
+ }
+
+ implicit class DatasetTransformer[T](dateset: Dataset[T]) {
def fallbackSummary(): FallbackSummary = {
- collectQueryExecutionFallbackSummary(dateset.queryExecution)
+ collectQueryExecutionFallbackSummary(dateset.sparkSession,
dateset.queryExecution)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]