This is an automated email from the ASF dual-hosted git repository.
marong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 60bf4b7426 [GLUTEN-11251] Fix incorrect whole stage id in
WholeStageTransformerExec (#11252)
60bf4b7426 is described below
commit 60bf4b74269057e4a800546942b07e1b77411d25
Author: Rong Ma <[email protected]>
AuthorDate: Wed Jan 7 18:32:17 2026 +0000
[GLUTEN-11251] Fix incorrect whole stage id in WholeStageTransformerExec
(#11252)
---
.../clickhouse/CHSparkPlanExecApi.scala | 4 +-
.../benchmarks/CHAggAndShuffleBenchmark.scala | 9 +-
.../delta/files/GlutenDeltaFileFormatWriter.scala | 5 +-
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 2 +
.../TakeOrderedAndProjectExecTransformer.scala | 2 +-
.../gluten/execution/WholeStageTransformer.scala | 2 +-
.../ColumnarCollapseTransformStages.scala | 24 +++--
.../sql/execution/GenerateTransformStageId.scala | 76 +++++++++++++
.../spark/sql/execution/GlutenImplicits.scala | 5 +-
.../GlutenFormatWriterInjectsBase.scala | 3 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +-
.../org/apache/spark/sql/GlutenSQLQuerySuite.scala | 117 +++++++++++++++++++++
.../org/apache/gluten/sql/shims/SparkShims.scala | 3 +
.../gluten/sql/shims/spark32/Spark32Shims.scala | 8 ++
.../gluten/sql/shims/spark33/Spark33Shims.scala | 8 ++
.../gluten/sql/shims/spark34/Spark34Shims.scala | 5 +
.../gluten/sql/shims/spark35/Spark35Shims.scala | 5 +
.../gluten/sql/shims/spark40/Spark40Shims.scala | 5 +
.../gluten/sql/shims/spark41/Spark41Shims.scala | 5 +
19 files changed, 267 insertions(+), 23 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 82da9aa694..68ada074e0 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -514,7 +514,9 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
val childWithAdapter =
ColumnarCollapseTransformStages.wrapInputIteratorTransformer(child)
WholeStageTransformer(
ProjectExecTransformer(child.output ++ appendedProjections,
childWithAdapter))(
-
ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet()
+ ColumnarCollapseTransformStages
+ .getTransformStageCounter(childWithAdapter)
+ .incrementAndGet()
)
}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHAggAndShuffleBenchmark.scala
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHAggAndShuffleBenchmark.scala
index 3f7ac3eccc..fb6c7974a8 100644
---
a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHAggAndShuffleBenchmark.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHAggAndShuffleBenchmark.scala
@@ -156,7 +156,10 @@ object CHAggAndShuffleBenchmark extends SqlBasedBenchmark
with CHSqlBasedBenchma
// Get the `FileSourceScanExecTransformer`
val fileScan = executedPlan.collect { case scan:
FileSourceScanExecTransformer => scan }.head
val scanStage = WholeStageTransformer(fileScan)(
- ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet())
+ ColumnarCollapseTransformStages
+ .getTransformStageCounter(fileScan)
+ .incrementAndGet()
+ )
val scanStageRDD = scanStage.executeColumnar()
// Get the total row count
@@ -200,7 +203,9 @@ object CHAggAndShuffleBenchmark extends SqlBasedBenchmark
with CHSqlBasedBenchma
val projectFilter = executedPlan.collect { case project:
ProjectExecTransformer => project }
if (projectFilter.nonEmpty) {
val projectFilterStage = WholeStageTransformer(projectFilter.head)(
-
ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet())
+ ColumnarCollapseTransformStages
+ .getTransformStageCounter(projectFilter.head)
+ .incrementAndGet())
val projectFilterStageRDD = projectFilterStage.executeColumnar()
chAllStagesBenchmark.addCase(s"Project Stage", executedCnt) {
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 69674b40a3..8b5a4fdc34 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -260,9 +260,8 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
nativeSortPlan
}
val newPlan = sortPlan.child match {
- case WholeStageTransformer(wholeStageChild, materializeInput) =>
- WholeStageTransformer(addNativeSort(wholeStageChild),
-
materializeInput)(ColumnarCollapseTransformStages.transformStageCounter.incrementAndGet())
+ case wst @ WholeStageTransformer(wholeStageChild, _) =>
+ wst.withNewChildren(Seq(addNativeSort(wholeStageChild)))
case other =>
Transitions.toBatchPlan(sortPlan, VeloxBatchType)
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 7a78bb8468..3bf030bf2c 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -139,6 +139,7 @@ object VeloxRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPost(c => each(c.session)))
injector.injectPost(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
+ injector.injectPost(_ => GenerateTransformStageId())
injector.injectPost(c => CudfNodeValidationRule(new
GlutenConfig(c.sqlConf)))
injector.injectPost(c => GlutenNoopWriterRule(c.session))
@@ -240,6 +241,7 @@ object VeloxRuleApi {
.getExtendedColumnarPostRules()
.foreach(each => injector.injectPostTransform(c => each(c.session)))
injector.injectPostTransform(c => ColumnarCollapseTransformStages(new
GlutenConfig(c.sqlConf)))
+ injector.injectPostTransform(_ => GenerateTransformStageId())
injector.injectPostTransform(c => CudfNodeValidationRule(new
GlutenConfig(c.sqlConf)))
injector.injectPostTransform(c => GlutenNoopWriterRule(c.session))
injector.injectPostTransform(c =>
RemoveGlutenTableCacheColumnarToRow(c.session))
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
index 2887812c8a..39b37a0d18 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/TakeOrderedAndProjectExecTransformer.scala
@@ -137,7 +137,7 @@ case class TakeOrderedAndProjectExecTransformer(
LimitExecTransformer(localSortPlan, limitBeforeShuffleOffset, limit)
}
val transformStageCounter: AtomicInteger =
- ColumnarCollapseTransformStages.transformStageCounter
+ ColumnarCollapseTransformStages.getTransformStageCounter(child)
val finalLimitPlan = if (hasShuffle) {
limitBeforeShuffle
} else {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
index db42778d29..acef5d798e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WholeStageTransformer.scala
@@ -155,7 +155,7 @@ trait UnaryTransformSupport extends TransformSupport with
UnaryExecNode {
}
case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean =
false)(
- val transformStageId: Int
+ var transformStageId: Int
) extends WholeStageTransformerGenerateTreeStringShim
with UnaryTransformSupport {
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index d41f06d87d..a770fdf7a3 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -139,10 +139,7 @@ case class InputIteratorTransformer(child: SparkPlan)
extends UnaryTransformSupp
* created, e.g. for special fallback handling when an existing
WholeStageTransformer failed to
* generate/compile code.
*/
-case class ColumnarCollapseTransformStages(
- glutenConf: GlutenConfig,
- transformStageCounter: AtomicInteger =
ColumnarCollapseTransformStages.transformStageCounter)
- extends Rule[SparkPlan] {
+case class ColumnarCollapseTransformStages(glutenConf: GlutenConfig) extends
Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
insertWholeStageTransformer(plan)
@@ -176,8 +173,8 @@ case class ColumnarCollapseTransformStages(
private def insertWholeStageTransformer(plan: SparkPlan): SparkPlan = {
plan match {
case t if supportTransform(t) =>
-
WholeStageTransformer(t.withNewChildren(t.children.map(insertInputIteratorTransformer)))(
- transformStageCounter.incrementAndGet())
+ // transformStageId will be updated by rule `GenerateTransformStageId`.
+
WholeStageTransformer(t.withNewChildren(t.children.map(insertInputIteratorTransformer)))(-1)
case other =>
other.withNewChildren(other.children.map(insertWholeStageTransformer))
}
@@ -213,9 +210,20 @@ case class ColumnarInputAdapter(child: SparkPlan)
}
object ColumnarCollapseTransformStages {
- val transformStageCounter = new AtomicInteger(0)
-
def wrapInputIteratorTransformer(plan: SparkPlan): TransformSupport = {
InputIteratorTransformer(ColumnarInputAdapter(plan))
}
+
+ def getTransformStageCounter(plan: SparkPlan): AtomicInteger = {
+ new AtomicInteger(findMaxTransformStageId(plan))
+ }
+
+ private def findMaxTransformStageId(plan: SparkPlan): Int = {
+ plan match {
+ case wst: WholeStageTransformer =>
+ wst.transformStageId
+ case _ =>
+ plan.children.map(findMaxTransformStageId).foldLeft(0)(Math.max)
+ }
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GenerateTransformStageId.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GenerateTransformStageId.scala
new file mode 100644
index 0000000000..b28eac8a12
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GenerateTransformStageId.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.exception.GlutenException
+import org.apache.gluten.execution.WholeStageTransformer
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, BroadcastQueryStageExec, ShuffleQueryStageExec}
+import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ReusedExchangeExec, ShuffleExchangeLike}
+
+import java.util
+import java.util.Collections.newSetFromMap
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * Generate `transformStageId` for `WholeStageTransformerExec`. This rule
updates the whole plan
+ * tree with * incremental and unique transform stage id before the final
execution.
+ *
+ * In Spark, the whole stage id is generated by incrementing a global counter.
In Gluten, it's not
+ * possible to use global counter for id generation, especially in the case of
AQE.
+ */
+case class GenerateTransformStageId() extends Rule[SparkPlan] with
AdaptiveSparkPlanHelper {
+ private val transformStageCounter: AtomicInteger = new AtomicInteger(0)
+
+ private val wholeStageTransformerCache =
+ newSetFromMap[WholeStageTransformer](new util.IdentityHashMap())
+
+ def apply(plan: SparkPlan): SparkPlan = {
+ updateStageId(plan)
+ plan
+ }
+
+ private def updateStageId(plan: SparkPlan): Unit = {
+ plan match {
+ case b: BroadcastQueryStageExec =>
+ b.plan match {
+ case b: BroadcastExchangeLike => updateStageId(b)
+ case _: ReusedExchangeExec =>
+ case _ =>
+ throw new GlutenException(s"wrong plan for broadcast stage:\n
${plan.treeString}")
+ }
+ case s: ShuffleQueryStageExec =>
+ s.plan match {
+ case s: ShuffleExchangeLike => updateStageId(s)
+ case _: ReusedExchangeExec =>
+ case _ =>
+ throw new GlutenException(s"wrong plan for shuffle stage:\n
${plan.treeString}")
+ }
+ case aqe: AdaptiveSparkPlanExec if
SparkShimLoader.getSparkShims.isFinalAdaptivePlan(aqe) =>
+ updateStageId(stripAQEPlan(aqe))
+ case wst: WholeStageTransformer if
!wholeStageTransformerCache.contains(wst) =>
+ updateStageId(wst.child)
+ wst.transformStageId = transformStageCounter.incrementAndGet()
+ wholeStageTransformerCache.add(wst)
+ case plan =>
+ plan.subqueries.foreach(updateStageId)
+ plan.children.foreach(updateStageId)
+ }
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 7267ce56ba..48710a9edf 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -95,10 +95,7 @@ object GlutenImplicits {
}
private def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
- val args = p.argString(Int.MaxValue)
- val index = args.indexOf("isFinalPlan=")
- assert(index >= 0)
- args.substring(index + "isFinalPlan=".length).trim.toBoolean
+ SparkShimLoader.getSparkShims.isFinalAdaptivePlan(p)
}
private def collectFallbackNodes(
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
index 53ee2855e6..7c9721e2f0 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/datasources/GlutenFormatWriterInjectsBase.scala
@@ -23,7 +23,6 @@ import
org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
import org.apache.gluten.extension.columnar.transition.{InsertTransitions,
RemoveTransitions}
import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages,
SparkPlan}
-import
org.apache.spark.sql.execution.ColumnarCollapseTransformStages.transformStageCounter
trait GlutenFormatWriterInjectsBase extends GlutenFormatWriterInjects {
private lazy val transform = HeuristicTransform.static()
@@ -66,7 +65,7 @@ trait GlutenFormatWriterInjectsBase extends
GlutenFormatWriterInjects {
// and cannot provide const-ness.
val transformedWithAdapter = injectAdapter(transformed)
val wst = WholeStageTransformer(transformedWithAdapter, materializeInput =
true)(
- transformStageCounter.incrementAndGet())
+
ColumnarCollapseTransformStages.getTransformStageCounter(transformed).incrementAndGet())
val wstWithTransitions =
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarToCarrierRow(
InsertTransitions.create(outputsColumnar = true,
wst.batchType()).apply(wst))
wstWithTransitions
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index d1c66e66d9..24b84d58ae 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -858,7 +858,7 @@ class VeloxTestSettings extends BackendTestSettings {
// ORC related
.exclude("SPARK-37965: Spark support read/write orc file with invalid char
in field name")
.exclude("SPARK-38173: Quoted column cannot be recognized correctly when
quotedRegexColumnNames is true")
- // TODO: fix in Spark-4.0
+ // Rewrite with Gluten's explained result.
.exclude("SPARK-47939: Explain should work with parameterized queries")
enableSuite[GlutenSQLQueryTestSuite]
enableSuite[GlutenStatisticsCollectionSuite]
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
index 8f397c517e..c75569af25 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala
@@ -154,4 +154,121 @@ class GlutenSQLQuerySuite extends SQLQuerySuite with
GlutenSQLTestsTrait {
assert(inputOutputPairs.map(_._1).sum == numRows)
}
}
+
+ testGluten("SPARK-47939: Explain should work with parameterized queries") {
+ def checkQueryPlan(df: DataFrame, plan: String): Unit = assert(
+ df.collect()
+ .map(_.getString(0))
+ .map(_.replaceAll("#[0-9]+", "#N"))
+ // Remove the backend keyword in c2r/r2c.
+ .map(_.replaceAll("[A-Za-z]*ColumnarToRow", "ColumnarToRow"))
+ .map(_.replaceAll("RowTo[A-Za-z]*Columnar", "RowToColumnar"))
+ === Array(plan.stripMargin)
+ )
+
+ checkQueryPlan(
+ spark.sql("explain select ?", Array(1)),
+ """== Physical Plan ==
+ |ColumnarToRow
+ |+- ^(1) ProjectExecTransformer [1 AS 1#N]
+ | +- ^(1) InputIteratorTransformer[]
+ | +- RowToColumnar
+ | +- *(1) Scan OneRowRelation[]
+ |
+ |"""
+ )
+ checkQueryPlan(
+ spark.sql("explain select :first", Map("first" -> 1)),
+ """== Physical Plan ==
+ |ColumnarToRow
+ |+- ^(1) ProjectExecTransformer [1 AS 1#N]
+ | +- ^(1) InputIteratorTransformer[]
+ | +- RowToColumnar
+ | +- *(1) Scan OneRowRelation[]
+ |
+ |"""
+ )
+
+ checkQueryPlan(
+ spark.sql("explain explain explain select ?", Array(1)),
+ """== Physical Plan ==
+ |Execute ExplainCommand
+ | +- ExplainCommand ExplainCommand 'PosParameterizedQuery [1],
SimpleMode, SimpleMode
+
+ |"""
+ )
+ checkQueryPlan(
+ spark.sql("explain explain explain select :first", Map("first" -> 1)),
+ // scalastyle:off
+ """== Physical Plan ==
+ |Execute ExplainCommand
+ | +- ExplainCommand ExplainCommand 'NameParameterizedQuery [first],
[1], SimpleMode, SimpleMode
+
+ |"""
+ // scalastyle:on
+ )
+
+ checkQueryPlan(
+ spark.sql("explain describe select ?", Array(1)),
+ """== Physical Plan ==
+ |Execute DescribeQueryCommand
+ | +- DescribeQueryCommand select ?
+
+ |"""
+ )
+ checkQueryPlan(
+ spark.sql("explain describe select :first", Map("first" -> 1)),
+ """== Physical Plan ==
+ |Execute DescribeQueryCommand
+ | +- DescribeQueryCommand select :first
+
+ |"""
+ )
+
+ checkQueryPlan(
+ spark.sql("explain extended select * from values (?, ?) t(x, y)",
Array(1, "a")),
+ """== Parsed Logical Plan ==
+ |'PosParameterizedQuery [1, a]
+ |+- 'Project [*]
+ | +- 'SubqueryAlias t
+ | +- 'UnresolvedInlineTable [x, y], [[posparameter(39),
posparameter(42)]]
+
+ |== Analyzed Logical Plan ==
+ |x: int, y: string
+ |Project [x#N, y#N]
+ |+- SubqueryAlias t
+ | +- LocalRelation [x#N, y#N]
+
+ |== Optimized Logical Plan ==
+ |LocalRelation [x#N, y#N]
+
+ |== Physical Plan ==
+ |LocalTableScan [x#N, y#N]
+ |"""
+ )
+ checkQueryPlan(
+ spark.sql(
+ "explain extended select * from values (:first, :second) t(x, y)",
+ Map("first" -> 1, "second" -> "a")
+ ),
+ """== Parsed Logical Plan ==
+ |'NameParameterizedQuery [first, second], [1, a]
+ |+- 'Project [*]
+ | +- 'SubqueryAlias t
+ | +- 'UnresolvedInlineTable [x, y], [[namedparameter(first),
namedparameter(second)]]
+
+ |== Analyzed Logical Plan ==
+ |x: int, y: string
+ |Project [x#N, y#N]
+ |+- SubqueryAlias t
+ | +- LocalRelation [x#N, y#N]
+
+ |== Optimized Logical Plan ==
+ |LocalRelation [x#N, y#N]
+
+ |== Physical Plan ==
+ |LocalTableScan [x#N, y#N]
+ |"""
+ )
+ }
}
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 9164f4b7c4..c2542f0368 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -39,6 +39,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
@@ -365,4 +366,6 @@ trait SparkShims {
sparkSession: SparkSession,
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan
+
+ def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean
}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index d1bf07e64b..4d02992954 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -40,6 +40,7 @@ import
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec,
PartitionedFileUtil, QueryExecution, SparkPlan, SparkPlanner}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -315,4 +316,11 @@ class Spark32Shims extends SparkShims {
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan =
QueryExecution.createSparkPlan(sparkSession, planner, plan)
+
+ override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ val args = p.argString(Int.MaxValue)
+ val index = args.indexOf("isFinalPlan=")
+ assert(index >= 0)
+ args.substring(index + "isFinalPlan=".length).trim.toBoolean
+ }
}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index a18fb31719..19bb10c0ee 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -41,6 +41,7 @@ import
org.apache.spark.sql.catalyst.util.RebaseDateTime.RebaseSpec
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.execution.{FileSourceScanExec,
PartitionedFileUtil, QueryExecution, SparkPlan, SparkPlanner}
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.FileFormatWriter.Empty2Null
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
@@ -420,4 +421,11 @@ class Spark33Shims extends SparkShims {
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan =
QueryExecution.createSparkPlan(sparkSession, planner, plan)
+
+ override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ val args = p.argString(Int.MaxValue)
+ val index = args.indexOf("isFinalPlan=")
+ assert(index >= 0)
+ args.substring(index + "isFinalPlan=".length).trim.toBoolean
+ }
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index cdbeaa4783..199c5313da 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
@@ -663,4 +664,8 @@ class Spark34Shims extends SparkShims {
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan =
QueryExecution.createSparkPlan(sparkSession, planner, plan)
+
+ override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ p.isFinalPlan
+ }
}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index d993cc0bfd..0619cea66a 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -44,6 +44,7 @@ import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
@@ -714,4 +715,8 @@ class Spark35Shims extends SparkShims {
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan =
QueryExecution.createSparkPlan(sparkSession, planner, plan)
+
+ override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ p.isFinalPlan
+ }
}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index e5258eafa4..72c9b27293 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -46,6 +46,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
BatchScanExecShim, DataSourceV2ScanExecBase}
@@ -765,4 +766,8 @@ class Spark40Shims extends SparkShims {
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan =
QueryExecution.createSparkPlan(sparkSession, planner, plan)
+
+ override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ p.isFinalPlan
+ }
}
diff --git
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
index ea5f733614..8aad6394f0 100644
---
a/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
+++
b/shims/spark41/src/main/scala/org/apache/gluten/sql/shims/spark41/Spark41Shims.scala
@@ -45,6 +45,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.read.{HasPartitionKey, InputPartition,
Scan}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat,
ParquetFilters}
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
BatchScanExecShim, DataSourceV2ScanExecBase}
@@ -764,4 +765,8 @@ class Spark41Shims extends SparkShims {
planner: SparkPlanner,
plan: LogicalPlan): SparkPlan =
QueryExecution.createSparkPlan(planner, plan)
+
+ override def isFinalAdaptivePlan(p: AdaptiveSparkPlanExec): Boolean = {
+ p.isFinalPlan
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]