This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new c9748d4 [SPARK-31495][SQL] Support formatted explain for AQE
c9748d4 is described below
commit c9748d4f00c505053c81c8aeb69f7166e92f82a6
Author: yi.wu <[email protected]>
AuthorDate: Wed Apr 22 12:44:06 2020 +0000
[SPARK-31495][SQL] Support formatted explain for AQE
### What changes were proposed in this pull request?
To support formatted explain for AQE.
### Why are the changes needed?
AQE does not support formatted explain yet. It's good to support it for
better user experience, debugging, etc.
Before:
```
== Physical Plan ==
AdaptiveSparkPlan (1)
+- * HashAggregate (unknown)
+- CustomShuffleReader (unknown)
+- ShuffleQueryStage (unknown)
+- Exchange (unknown)
+- * HashAggregate (unknown)
+- * Project (unknown)
+- * BroadcastHashJoin Inner BuildRight (unknown)
:- * LocalTableScan (unknown)
+- BroadcastQueryStage (unknown)
+- BroadcastExchange (unknown)
+- LocalTableScan (unknown)
(1) AdaptiveSparkPlan
Output [4]: [k#7, count(v1)#32L, sum(v1)#33L, avg(v2)#34]
Arguments: HashAggregate(keys=[k#7], functions=[count(1), sum(cast(v1#8 as
bigint)), avg(cast(v2#19 as bigint))]),
AdaptiveExecutionContext(org.apache.spark.sql.SparkSession104ab57b),
[PlanAdaptiveSubqueries(Map())], false
```
After:
```
== Physical Plan ==
AdaptiveSparkPlan (14)
+- * HashAggregate (13)
+- CustomShuffleReader (12)
+- ShuffleQueryStage (11)
+- Exchange (10)
+- * HashAggregate (9)
+- * Project (8)
+- * BroadcastHashJoin Inner BuildRight (7)
:- * Project (2)
: +- * LocalTableScan (1)
+- BroadcastQueryStage (6)
+- BroadcastExchange (5)
+- * Project (4)
+- * LocalTableScan (3)
(1) LocalTableScan [codegen id : 2]
Output [2]: [_1#x, _2#x]
Arguments: [_1#x, _2#x]
(2) Project [codegen id : 2]
Output [2]: [_1#x AS k#x, _2#x AS v1#x]
Input [2]: [_1#x, _2#x]
(3) LocalTableScan [codegen id : 1]
Output [2]: [_1#x, _2#x]
Arguments: [_1#x, _2#x]
(4) Project [codegen id : 1]
Output [2]: [_1#x AS k#x, _2#x AS v2#x]
Input [2]: [_1#x, _2#x]
(5) BroadcastExchange
Input [2]: [k#x, v2#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, false] as
bigint))), [id=#x]
(6) BroadcastQueryStage
Output [2]: [k#x, v2#x]
Arguments: 0
(7) BroadcastHashJoin [codegen id : 2]
Left keys [1]: [k#x]
Right keys [1]: [k#x]
Join condition: None
(8) Project [codegen id : 2]
Output [3]: [k#x, v1#x, v2#x]
Input [4]: [k#x, v1#x, k#x, v2#x]
(9) HashAggregate [codegen id : 2]
Input [3]: [k#x, v1#x, v2#x]
Keys [1]: [k#x]
Functions [3]: [partial_count(1), partial_sum(cast(v1#x as bigint)),
partial_avg(cast(v2#x as bigint))]
Aggregate Attributes [4]: [count#xL, sum#xL, sum#x, count#xL]
Results [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
(10) Exchange
Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
Arguments: hashpartitioning(k#x, 5), true, [id=#x]
(11) ShuffleQueryStage
Output [5]: [sum#xL, k#x, sum#x, count#xL, count#xL]
Arguments: 1
(12) CustomShuffleReader
Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
Arguments: coalesced
(13) HashAggregate [codegen id : 3]
Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
Keys [1]: [k#x]
Functions [3]: [count(1), sum(cast(v1#x as bigint)), avg(cast(v2#x as
bigint))]
Aggregate Attributes [3]: [count(1)#xL, sum(cast(v1#x as bigint))#xL,
avg(cast(v2#x as bigint))#x]
Results [4]: [k#x, count(1)#xL AS count(v1)#xL, sum(cast(v1#x as
bigint))#xL AS sum(v1)#xL, avg(cast(v2#x as bigint))#x AS avg(v2)#x]
(14) AdaptiveSparkPlan
Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
Arguments: isFinalPlan=true
```
### Does this PR introduce any user-facing change?
No, this should be new feature along with AQE in Spark 3.0.
### How was this patch tested?
Added a query file: `explain-aqe.sql` and a unit test.
Closes #28271 from Ngone51/support_formatted_explain_for_aqe.
Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 8fbfdb38c0baff7bc5d1ce1e3d6f1df70c25da70)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../apache/spark/sql/execution/ExplainUtils.scala | 59 +-
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 5 +-
.../adaptive/InsertAdaptiveSparkPlan.scala | 2 +-
.../resources/sql-tests/inputs/explain-aqe.sql | 3 +
.../test/resources/sql-tests/inputs/explain.sql | 1 +
.../{explain.sql.out => explain-aqe.sql.out} | 669 ++++++++-------------
.../resources/sql-tests/results/explain.sql.out | 10 +-
.../scala/org/apache/spark/sql/ExplainSuite.scala | 72 ++-
.../adaptive/AdaptiveQueryExecSuite.scala | 6 +-
9 files changed, 358 insertions(+), 469 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
index 5d43093..aec1c93 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala
@@ -23,8 +23,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AdaptiveSparkPlanHelper, QueryStageExec}
-object ExplainUtils {
+object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the
operaror
@@ -144,15 +145,26 @@ object ExplainUtils {
case p: WholeStageCodegenExec =>
case p: InputAdapter =>
case other: QueryPlan[_] =>
- if (!other.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
+
+ def setOpId(): Unit = if
(other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
operatorIDs += ((currentOperationID, other))
}
- other.innerChildren.foreach { plan =>
- currentOperationID = generateOperatorIDs(plan,
- currentOperationID,
- operatorIDs)
+
+ other match {
+ case p: AdaptiveSparkPlanExec =>
+ currentOperationID =
+ generateOperatorIDs(p.executedPlan, currentOperationID,
operatorIDs)
+ setOpId()
+ case p: QueryStageExec =>
+ currentOperationID = generateOperatorIDs(p.plan,
currentOperationID, operatorIDs)
+ setOpId()
+ case _ =>
+ setOpId()
+ other.innerChildren.foldLeft(currentOperationID) {
+ (curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
+ }
}
}
currentOperationID
@@ -163,21 +175,25 @@ object ExplainUtils {
* whole stage code gen id in the plan via setting a tag.
*/
private def generateWholeStageCodegenIds(plan: QueryPlan[_]): Unit = {
+ var currentCodegenId = -1
+
+ def setCodegenId(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
+ if (currentCodegenId != -1) {
+ p.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
+ }
+ children.foreach(generateWholeStageCodegenIds)
+ }
+
// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return
}
- var currentCodegenId = -1
plan.foreach {
case p: WholeStageCodegenExec => currentCodegenId = p.codegenStageId
case _: InputAdapter => currentCodegenId = -1
- case other: QueryPlan[_] =>
- if (currentCodegenId != -1) {
- other.setTagValue(QueryPlan.CODEGEN_ID_TAG, currentCodegenId)
- }
- other.innerChildren.foreach { plan =>
- generateWholeStageCodegenIds(plan)
- }
+ case p: AdaptiveSparkPlanExec => setCodegenId(p, Seq(p.executedPlan))
+ case p: QueryStageExec => setCodegenId(p, Seq(p.plan))
+ case other: QueryPlan[_] => setCodegenId(other, other.innerChildren)
}
}
@@ -232,13 +248,16 @@ object ExplainUtils {
}
def removeTags(plan: QueryPlan[_]): Unit = {
+ def remove(p: QueryPlan[_], children: Seq[QueryPlan[_]]): Unit = {
+ p.unsetTagValue(QueryPlan.OP_ID_TAG)
+ p.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
+ children.foreach(removeTags)
+ }
+
plan foreach {
- case plan: QueryPlan[_] =>
- plan.unsetTagValue(QueryPlan.OP_ID_TAG)
- plan.unsetTagValue(QueryPlan.CODEGEN_ID_TAG)
- plan.innerChildren.foreach { p =>
- removeTags(p)
- }
+ case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
+ case p: QueryStageExec => remove(p, Seq(p.plan))
+ case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
}
}
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index d73540c..f00dce2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -251,10 +251,7 @@ case class AdaptiveSparkPlanExec(
getFinalPhysicalPlan().execute()
}
- override def verboseString(maxFields: Int): String = simpleString(maxFields)
-
- override def simpleString(maxFields: Int): String =
- s"AdaptiveSparkPlan(isFinalPlan=$isFinalPlan)"
+ protected override def stringArgs: Iterator[Any] =
Iterator(s"isFinalPlan=$isFinalPlan")
override def generateTreeString(
depth: Int,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index ea586f0..754225d 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -122,7 +122,7 @@ case class InsertAdaptiveSparkPlan(
if !subqueryMap.contains(exprId.id) =>
val executedPlan = compileSubquery(p)
verifyAdaptivePlan(executedPlan, p)
- val subquery = SubqueryExec(s"subquery${exprId.id}", executedPlan)
+ val subquery = SubqueryExec(s"subquery#${exprId.id}", executedPlan)
subqueryMap.put(exprId.id, subquery)
case expressions.InSubquery(_, ListQuery(query, _, exprId, _))
if !subqueryMap.contains(exprId.id) =>
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
new file mode 100644
index 0000000..f4afa2b
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/explain-aqe.sql
@@ -0,0 +1,3 @@
+--IMPORT explain.sql
+
+--SET spark.sql.adaptive.enabled=true
diff --git a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
index 497b61c..80bf258 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/explain.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/explain.sql
@@ -117,3 +117,4 @@ EXPLAIN FORMATTED
DROP TABLE explain_temp1;
DROP TABLE explain_temp2;
DROP TABLE explain_temp3;
+DROP TABLE explain_temp4;
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
similarity index 57%
copy from sql/core/src/test/resources/sql-tests/results/explain.sql.out
copy to sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index 06226f1..e1d4fa8 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 22
+-- Number of queries: 23
-- !query
@@ -53,14 +53,14 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* Sort (9)
-+- Exchange (8)
- +- * HashAggregate (7)
- +- Exchange (6)
- +- * HashAggregate (5)
- +- * Project (4)
- +- * Filter (3)
- +- * ColumnarToRow (2)
+AdaptiveSparkPlan (9)
++- Sort (8)
+ +- Exchange (7)
+ +- HashAggregate (6)
+ +- Exchange (5)
+ +- HashAggregate (4)
+ +- Project (3)
+ +- Filter (2)
+- Scan parquet default.explain_temp1 (1)
@@ -71,42 +71,43 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 1]
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-(4) Project [codegen id : 1]
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(5) HashAggregate [codegen id : 1]
+(4) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-(6) Exchange
+(5) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-(7) HashAggregate [codegen id : 2]
+(6) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-(8) Exchange
+(7) Exchange
Input [2]: [key#x, max(val)#x]
Arguments: rangepartitioning(key#x ASC NULLS FIRST, 4), true, [id=#x]
-(9) Sort [codegen id : 3]
+(8) Sort
Input [2]: [key#x, max(val)#x]
Arguments: [key#x ASC NULLS FIRST], true, 0
+
+(9) AdaptiveSparkPlan
+Output [2]: [key#x, max(val)#x]
+Arguments: isFinalPlan=false
-- !query
@@ -120,14 +121,14 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* Project (9)
-+- * Filter (8)
- +- * HashAggregate (7)
- +- Exchange (6)
- +- * HashAggregate (5)
- +- * Project (4)
- +- * Filter (3)
- +- * ColumnarToRow (2)
+AdaptiveSparkPlan (9)
++- Project (8)
+ +- Filter (7)
+ +- HashAggregate (6)
+ +- Exchange (5)
+ +- HashAggregate (4)
+ +- Project (3)
+ +- Filter (2)
+- Scan parquet default.explain_temp1 (1)
@@ -138,42 +139,43 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 1]
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-(4) Project [codegen id : 1]
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(5) HashAggregate [codegen id : 1]
+(4) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-(6) Exchange
+(5) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-(7) HashAggregate [codegen id : 2]
+(6) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [3]: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x]
-(8) Filter [codegen id : 2]
+(7) Filter
Input [3]: [key#x, max(val)#x, max(val#x)#x]
Condition : (isnotnull(max(val#x)#x) AND (max(val#x)#x > 0))
-(9) Project [codegen id : 2]
+(8) Project
Output [2]: [key#x, max(val)#x]
Input [3]: [key#x, max(val)#x, max(val#x)#x]
+
+(9) AdaptiveSparkPlan
+Output [2]: [key#x, max(val)#x]
+Arguments: isFinalPlan=false
-- !query
@@ -185,18 +187,17 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* HashAggregate (12)
-+- Exchange (11)
- +- * HashAggregate (10)
- +- Union (9)
- :- * Project (4)
- : +- * Filter (3)
- : +- * ColumnarToRow (2)
- : +- Scan parquet default.explain_temp1 (1)
- +- * Project (8)
- +- * Filter (7)
- +- * ColumnarToRow (6)
- +- Scan parquet default.explain_temp1 (5)
+AdaptiveSparkPlan (11)
++- HashAggregate (10)
+ +- Exchange (9)
+ +- HashAggregate (8)
+ +- Union (7)
+ :- Project (3)
+ : +- Filter (2)
+ : +- Scan parquet default.explain_temp1 (1)
+ +- Project (6)
+ +- Filter (5)
+ +- Scan parquet default.explain_temp1 (4)
(1) Scan parquet default.explain_temp1
@@ -206,54 +207,52 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 1]
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-(4) Project [codegen id : 1]
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(5) Scan parquet default.explain_temp1
+(4) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,0)]
ReadSchema: struct<key:int,val:int>
-(6) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-
-(7) Filter [codegen id : 2]
+(5) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 0))
-(8) Project [codegen id : 2]
+(6) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(9) Union
+(7) Union
-(10) HashAggregate [codegen id : 3]
+(8) HashAggregate
Input [2]: [key#x, val#x]
Keys [2]: [key#x, val#x]
Functions: []
Aggregate Attributes: []
Results [2]: [key#x, val#x]
-(11) Exchange
+(9) Exchange
Input [2]: [key#x, val#x]
Arguments: hashpartitioning(key#x, val#x, 4), true, [id=#x]
-(12) HashAggregate [codegen id : 4]
+(10) HashAggregate
Input [2]: [key#x, val#x]
Keys [2]: [key#x, val#x]
Functions: []
Aggregate Attributes: []
Results [2]: [key#x, val#x]
+
+(11) AdaptiveSparkPlan
+Output [2]: [key#x, val#x]
+Arguments: isFinalPlan=false
-- !query
@@ -266,16 +265,15 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* BroadcastHashJoin Inner BuildRight (10)
-:- * Project (4)
-: +- * Filter (3)
-: +- * ColumnarToRow (2)
-: +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (9)
- +- * Project (8)
- +- * Filter (7)
- +- * ColumnarToRow (6)
- +- Scan parquet default.explain_temp2 (5)
+AdaptiveSparkPlan (9)
++- BroadcastHashJoin Inner BuildRight (8)
+ :- Project (3)
+ : +- Filter (2)
+ : +- Scan parquet default.explain_temp1 (1)
+ +- BroadcastExchange (7)
+ +- Project (6)
+ +- Filter (5)
+ +- Scan parquet default.explain_temp2 (4)
(1) Scan parquet default.explain_temp1
@@ -285,43 +283,41 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 2]
+(2) Filter
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-(4) Project [codegen id : 2]
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(5) Scan parquet default.explain_temp2
+(4) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-(6) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(7) Filter [codegen id : 1]
+(5) Filter
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-(8) Project [codegen id : 1]
+(6) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(9) BroadcastExchange
+(7) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#x]
-(10) BroadcastHashJoin [codegen id : 2]
+(8) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
+
+(9) AdaptiveSparkPlan
+Output [4]: [key#x, val#x, key#x, val#x]
+Arguments: isFinalPlan=false
-- !query
@@ -334,14 +330,13 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* BroadcastHashJoin LeftOuter BuildRight (8)
-:- * ColumnarToRow (2)
-: +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (7)
- +- * Project (6)
- +- * Filter (5)
- +- * ColumnarToRow (4)
- +- Scan parquet default.explain_temp2 (3)
+AdaptiveSparkPlan (7)
++- BroadcastHashJoin LeftOuter BuildRight (6)
+ :- Scan parquet default.explain_temp1 (1)
+ +- BroadcastExchange (5)
+ +- Project (4)
+ +- Filter (3)
+ +- Scan parquet default.explain_temp2 (2)
(1) Scan parquet default.explain_temp1
@@ -350,35 +345,33 @@ Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-
-(3) Scan parquet default.explain_temp2
+(2) Scan parquet default.explain_temp2
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp2]
PushedFilters: [IsNotNull(key)]
ReadSchema: struct<key:int,val:int>
-(4) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(5) Filter [codegen id : 1]
+(3) Filter
Input [2]: [key#x, val#x]
Condition : isnotnull(key#x)
-(6) Project [codegen id : 1]
+(4) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(7) BroadcastExchange
+(5) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#x]
-(8) BroadcastHashJoin [codegen id : 2]
+(6) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
+
+(7) AdaptiveSparkPlan
+Output [4]: [key#x, val#x, key#x, val#x]
+Arguments: isFinalPlan=false
-- !query
@@ -396,9 +389,9 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* Project (4)
-+- * Filter (3)
- +- * ColumnarToRow (2)
+AdaptiveSparkPlan (4)
++- Project (3)
+ +- Filter (2)
+- Scan parquet default.explain_temp1 (1)
@@ -409,110 +402,17 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), IsNotNull(val), GreaterThan(val,3)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery
scalar-subquery#x, [id=#x])) AND (val#x > 3))
-
-(4) Project [codegen id : 1]
-Output [2]: [key#x, val#x]
+(2) Filter
Input [2]: [key#x, val#x]
+Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery
subquery#x, [id=#x])) AND (val#x > 3))
-===== Subqueries =====
-
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery
scalar-subquery#x, [id=#x]
-* HashAggregate (11)
-+- Exchange (10)
- +- * HashAggregate (9)
- +- * Project (8)
- +- * Filter (7)
- +- * ColumnarToRow (6)
- +- Scan parquet default.explain_temp2 (5)
-
-
-(5) Scan parquet default.explain_temp2
+(3) Project
Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp2]
-PushedFilters: [IsNotNull(key), IsNotNull(val), EqualTo(val,2)]
-ReadSchema: struct<key:int,val:int>
-
-(6) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(7) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (((isnotnull(key#x) AND isnotnull(val#x)) AND (key#x = Subquery
scalar-subquery#x, [id=#x])) AND (val#x = 2))
-
-(8) Project [codegen id : 1]
-Output [1]: [key#x]
Input [2]: [key#x, val#x]
-(9) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_max(key#x)]
-Aggregate Attributes [1]: [max#x]
-Results [1]: [max#x]
-
-(10) Exchange
-Input [1]: [max#x]
-Arguments: SinglePartition, true, [id=#x]
-
-(11) HashAggregate [codegen id : 2]
-Input [1]: [max#x]
-Keys: []
-Functions [1]: [max(key#x)]
-Aggregate Attributes [1]: [max(key#x)#x]
-Results [1]: [max(key#x)#x AS max(key)#x]
-
-Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery
scalar-subquery#x, [id=#x]
-* HashAggregate (18)
-+- Exchange (17)
- +- * HashAggregate (16)
- +- * Project (15)
- +- * Filter (14)
- +- * ColumnarToRow (13)
- +- Scan parquet default.explain_temp3 (12)
-
-
-(12) Scan parquet default.explain_temp3
+(4) AdaptiveSparkPlan
Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp3]
-PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
-ReadSchema: struct<key:int,val:int>
-
-(13) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(14) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (isnotnull(val#x) AND (val#x > 0))
-
-(15) Project [codegen id : 1]
-Output [1]: [key#x]
-Input [2]: [key#x, val#x]
-
-(16) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_max(key#x)]
-Aggregate Attributes [1]: [max#x]
-Results [1]: [max#x]
-
-(17) Exchange
-Input [1]: [max#x]
-Arguments: SinglePartition, true, [id=#x]
-
-(18) HashAggregate [codegen id : 2]
-Input [1]: [max#x]
-Keys: []
-Functions [1]: [max(key#x)]
-Aggregate Attributes [1]: [max(key#x)#x]
-Results [1]: [max(key#x)#x AS max(key)#x]
+Arguments: isFinalPlan=false
-- !query
@@ -530,8 +430,8 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* Filter (3)
-+- * ColumnarToRow (2)
+AdaptiveSparkPlan (3)
++- Filter (2)
+- Scan parquet default.explain_temp1 (1)
@@ -541,106 +441,13 @@ Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : ((key#x = Subquery scalar-subquery#x, [id=#x]) OR (cast(key#x as
double) = Subquery scalar-subquery#x, [id=#x]))
-
-===== Subqueries =====
-
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery
scalar-subquery#x, [id=#x]
-* HashAggregate (10)
-+- Exchange (9)
- +- * HashAggregate (8)
- +- * Project (7)
- +- * Filter (6)
- +- * ColumnarToRow (5)
- +- Scan parquet default.explain_temp2 (4)
-
-
-(4) Scan parquet default.explain_temp2
-Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp2]
-PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
-ReadSchema: struct<key:int,val:int>
-
-(5) ColumnarToRow [codegen id : 1]
+(2) Filter
Input [2]: [key#x, val#x]
-
-(6) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (isnotnull(val#x) AND (val#x > 0))
-
-(7) Project [codegen id : 1]
-Output [1]: [key#x]
-Input [2]: [key#x, val#x]
-
-(8) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_max(key#x)]
-Aggregate Attributes [1]: [max#x]
-Results [1]: [max#x]
-
-(9) Exchange
-Input [1]: [max#x]
-Arguments: SinglePartition, true, [id=#x]
-
-(10) HashAggregate [codegen id : 2]
-Input [1]: [max#x]
-Keys: []
-Functions [1]: [max(key#x)]
-Aggregate Attributes [1]: [max(key#x)#x]
-Results [1]: [max(key#x)#x AS max(key)#x]
+Condition : ((key#x = Subquery subquery#x, [id=#x]) OR (cast(key#x as double)
= Subquery subquery#x, [id=#x]))
-Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery
scalar-subquery#x, [id=#x]
-* HashAggregate (17)
-+- Exchange (16)
- +- * HashAggregate (15)
- +- * Project (14)
- +- * Filter (13)
- +- * ColumnarToRow (12)
- +- Scan parquet default.explain_temp3 (11)
-
-
-(11) Scan parquet default.explain_temp3
+(3) AdaptiveSparkPlan
Output [2]: [key#x, val#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp3]
-PushedFilters: [IsNotNull(val), GreaterThan(val,0)]
-ReadSchema: struct<key:int,val:int>
-
-(12) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(13) Filter [codegen id : 1]
-Input [2]: [key#x, val#x]
-Condition : (isnotnull(val#x) AND (val#x > 0))
-
-(14) Project [codegen id : 1]
-Output [1]: [key#x]
-Input [2]: [key#x, val#x]
-
-(15) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_avg(cast(key#x as bigint))]
-Aggregate Attributes [2]: [sum#x, count#xL]
-Results [2]: [sum#x, count#xL]
-
-(16) Exchange
-Input [2]: [sum#x, count#xL]
-Arguments: SinglePartition, true, [id=#x]
-
-(17) HashAggregate [codegen id : 2]
-Input [2]: [sum#x, count#xL]
-Keys: []
-Functions [1]: [avg(cast(key#x as bigint))]
-Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
-Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
+Arguments: isFinalPlan=false
-- !query
@@ -651,8 +458,8 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* Project (3)
-+- * ColumnarToRow (2)
+AdaptiveSparkPlan (3)
++- Project (2)
+- Scan parquet default.explain_temp1 (1)
@@ -662,51 +469,13 @@ Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<>
-(2) ColumnarToRow [codegen id : 1]
+(2) Project
+Output [1]: [(Subquery subquery#x, [id=#x] + Subquery subquery#x, [id=#x]) AS
(scalarsubquery() + scalarsubquery())#x]
Input: []
-
-(3) Project [codegen id : 1]
-Output [1]: [(Subquery scalar-subquery#x, [id=#x] + ReusedSubquery Subquery
scalar-subquery#x, [id=#x]) AS (scalarsubquery() + scalarsubquery())#x]
-Input: []
-
-===== Subqueries =====
-
-Subquery:1 Hosting operator id = 3 Hosting Expression = Subquery
scalar-subquery#x, [id=#x]
-* HashAggregate (8)
-+- Exchange (7)
- +- * HashAggregate (6)
- +- * ColumnarToRow (5)
- +- Scan parquet default.explain_temp1 (4)
-
-
-(4) Scan parquet default.explain_temp1
-Output [1]: [key#x]
-Batched: true
-Location [not included in comparison]/{warehouse_dir}/explain_temp1]
-ReadSchema: struct<key:int>
-
-(5) ColumnarToRow [codegen id : 1]
-Input [1]: [key#x]
-
-(6) HashAggregate [codegen id : 1]
-Input [1]: [key#x]
-Keys: []
-Functions [1]: [partial_avg(cast(key#x as bigint))]
-Aggregate Attributes [2]: [sum#x, count#xL]
-Results [2]: [sum#x, count#xL]
-
-(7) Exchange
-Input [2]: [sum#x, count#xL]
-Arguments: SinglePartition, true, [id=#x]
-
-(8) HashAggregate [codegen id : 2]
-Input [2]: [sum#x, count#xL]
-Keys: []
-Functions [1]: [avg(cast(key#x as bigint))]
-Aggregate Attributes [1]: [avg(cast(key#x as bigint))#x]
-Results [1]: [avg(cast(key#x as bigint))#x AS avg(key)#x]
-Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery
Subquery scalar-subquery#x, [id=#x]
+(3) AdaptiveSparkPlan
+Output [1]: [(scalarsubquery() + scalarsubquery())#x]
+Arguments: isFinalPlan=false
-- !query
@@ -721,16 +490,15 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* BroadcastHashJoin Inner BuildRight (10)
-:- * Project (4)
-: +- * Filter (3)
-: +- * ColumnarToRow (2)
-: +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (9)
- +- * Project (8)
- +- * Filter (7)
- +- * ColumnarToRow (6)
- +- Scan parquet default.explain_temp1 (5)
+AdaptiveSparkPlan (9)
++- BroadcastHashJoin Inner BuildRight (8)
+ :- Project (3)
+ : +- Filter (2)
+ : +- Scan parquet default.explain_temp1 (1)
+ +- BroadcastExchange (7)
+ +- Project (6)
+ +- Filter (5)
+ +- Scan parquet default.explain_temp1 (4)
(1) Scan parquet default.explain_temp1
@@ -740,43 +508,41 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 2]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 2]
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-(4) Project [codegen id : 2]
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(5) Scan parquet default.explain_temp1
+(4) Scan parquet default.explain_temp1
Output [2]: [key#x, val#x]
Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-(6) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(7) Filter [codegen id : 1]
+(5) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-(8) Project [codegen id : 1]
+(6) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(9) BroadcastExchange
+(7) BroadcastExchange
Input [2]: [key#x, val#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#x]
-(10) BroadcastHashJoin [codegen id : 2]
+(8) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
+
+(9) AdaptiveSparkPlan
+Output [4]: [key#x, val#x, key#x, val#x]
+Arguments: isFinalPlan=false
-- !query
@@ -792,17 +558,21 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* BroadcastHashJoin Inner BuildRight (11)
-:- * HashAggregate (7)
-: +- Exchange (6)
-: +- * HashAggregate (5)
-: +- * Project (4)
-: +- * Filter (3)
-: +- * ColumnarToRow (2)
-: +- Scan parquet default.explain_temp1 (1)
-+- BroadcastExchange (10)
- +- * HashAggregate (9)
- +- ReusedExchange (8)
+AdaptiveSparkPlan (15)
++- BroadcastHashJoin Inner BuildRight (14)
+ :- HashAggregate (6)
+ : +- Exchange (5)
+ : +- HashAggregate (4)
+ : +- Project (3)
+ : +- Filter (2)
+ : +- Scan parquet default.explain_temp1 (1)
+ +- BroadcastExchange (13)
+ +- HashAggregate (12)
+ +- Exchange (11)
+ +- HashAggregate (10)
+ +- Project (9)
+ +- Filter (8)
+ +- Scan parquet default.explain_temp1 (7)
(1) Scan parquet default.explain_temp1
@@ -812,53 +582,77 @@ Location [not included in
comparison]/{warehouse_dir}/explain_temp1]
PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Filter [codegen id : 1]
+(2) Filter
Input [2]: [key#x, val#x]
Condition : (isnotnull(key#x) AND (key#x > 10))
-(4) Project [codegen id : 1]
+(3) Project
Output [2]: [key#x, val#x]
Input [2]: [key#x, val#x]
-(5) HashAggregate [codegen id : 1]
+(4) HashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_max(val#x)]
Aggregate Attributes [1]: [max#x]
Results [2]: [key#x, max#x]
-(6) Exchange
+(5) Exchange
Input [2]: [key#x, max#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-(7) HashAggregate [codegen id : 4]
+(6) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-(8) ReusedExchange [Reuses operator id: 6]
-Output [2]: [key#x, max#x]
+(7) Scan parquet default.explain_temp1
+Output [2]: [key#x, val#x]
+Batched: true
+Location [not included in comparison]/{warehouse_dir}/explain_temp1]
+PushedFilters: [IsNotNull(key), GreaterThan(key,10)]
+ReadSchema: struct<key:int,val:int>
+
+(8) Filter
+Input [2]: [key#x, val#x]
+Condition : (isnotnull(key#x) AND (key#x > 10))
-(9) HashAggregate [codegen id : 3]
+(9) Project
+Output [2]: [key#x, val#x]
+Input [2]: [key#x, val#x]
+
+(10) HashAggregate
+Input [2]: [key#x, val#x]
+Keys [1]: [key#x]
+Functions [1]: [partial_max(val#x)]
+Aggregate Attributes [1]: [max#x]
+Results [2]: [key#x, max#x]
+
+(11) Exchange
+Input [2]: [key#x, max#x]
+Arguments: hashpartitioning(key#x, 4), true, [id=#x]
+
+(12) HashAggregate
Input [2]: [key#x, max#x]
Keys [1]: [key#x]
Functions [1]: [max(val#x)]
Aggregate Attributes [1]: [max(val#x)#x]
Results [2]: [key#x, max(val#x)#x AS max(val)#x]
-(10) BroadcastExchange
+(13) BroadcastExchange
Input [2]: [key#x, max(val)#x]
Arguments: HashedRelationBroadcastMode(List(cast(input[0, int, true] as
bigint))), [id=#x]
-(11) BroadcastHashJoin [codegen id : 4]
+(14) BroadcastHashJoin
Left keys [1]: [key#x]
Right keys [1]: [key#x]
Join condition: None
+
+(15) AdaptiveSparkPlan
+Output [4]: [key#x, max(val)#x, key#x, max(val)#x]
+Arguments: isFinalPlan=false
-- !query
@@ -898,10 +692,10 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-* HashAggregate (5)
-+- Exchange (4)
- +- HashAggregate (3)
- +- * ColumnarToRow (2)
+AdaptiveSparkPlan (5)
++- HashAggregate (4)
+ +- Exchange (3)
+ +- HashAggregate (2)
+- Scan parquet default.explain_temp1 (1)
@@ -911,26 +705,27 @@ Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp1]
ReadSchema: struct<key:int,val:int>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) HashAggregate
+(2) HashAggregate
Input [2]: [key#x, val#x]
Keys: []
Functions [3]: [partial_count(val#x), partial_sum(cast(key#x as bigint)),
partial_count(key#x) FILTER (WHERE (val#x > 1))]
Aggregate Attributes [3]: [count#xL, sum#xL, count#xL]
Results [3]: [count#xL, sum#xL, count#xL]
-(4) Exchange
+(3) Exchange
Input [3]: [count#xL, sum#xL, count#xL]
Arguments: SinglePartition, true, [id=#x]
-(5) HashAggregate [codegen id : 2]
+(4) HashAggregate
Input [3]: [count#xL, sum#xL, count#xL]
Keys: []
Functions [3]: [count(val#x), sum(cast(key#x as bigint)), count(key#x)]
Aggregate Attributes [3]: [count(val#x)#xL, sum(cast(key#x as bigint))#xL,
count(key#x)#xL]
Results [2]: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL,
count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL]
+
+(5) AdaptiveSparkPlan
+Output [2]: [TOTAL#xL, count(key) FILTER (WHERE (val > 1))#xL]
+Arguments: isFinalPlan=false
-- !query
@@ -942,10 +737,10 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-ObjectHashAggregate (5)
-+- Exchange (4)
- +- ObjectHashAggregate (3)
- +- * ColumnarToRow (2)
+AdaptiveSparkPlan (5)
++- ObjectHashAggregate (4)
+ +- Exchange (3)
+ +- ObjectHashAggregate (2)
+- Scan parquet default.explain_temp4 (1)
@@ -955,26 +750,27 @@ Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp4]
ReadSchema: struct<key:int,val:string>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) ObjectHashAggregate
+(2) ObjectHashAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_collect_set(val#x, 0, 0)]
Aggregate Attributes [1]: [buf#x]
Results [2]: [key#x, buf#x]
-(4) Exchange
+(3) Exchange
Input [2]: [key#x, buf#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-(5) ObjectHashAggregate
+(4) ObjectHashAggregate
Input [2]: [key#x, buf#x]
Keys [1]: [key#x]
Functions [1]: [collect_set(val#x, 0, 0)]
Aggregate Attributes [1]: [collect_set(val#x, 0, 0)#x]
Results [2]: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS
sort_array(collect_set(val), true)[0]#x]
+
+(5) AdaptiveSparkPlan
+Output [2]: [key#x, sort_array(collect_set(val), true)[0]#x]
+Arguments: isFinalPlan=false
-- !query
@@ -986,12 +782,12 @@ EXPLAIN FORMATTED
struct<plan:string>
-- !query output
== Physical Plan ==
-SortAggregate (7)
-+- * Sort (6)
- +- Exchange (5)
- +- SortAggregate (4)
- +- * Sort (3)
- +- * ColumnarToRow (2)
+AdaptiveSparkPlan (7)
++- SortAggregate (6)
+ +- Sort (5)
+ +- Exchange (4)
+ +- SortAggregate (3)
+ +- Sort (2)
+- Scan parquet default.explain_temp4 (1)
@@ -1001,34 +797,35 @@ Batched: true
Location [not included in comparison]/{warehouse_dir}/explain_temp4]
ReadSchema: struct<key:int,val:string>
-(2) ColumnarToRow [codegen id : 1]
-Input [2]: [key#x, val#x]
-
-(3) Sort [codegen id : 1]
+(2) Sort
Input [2]: [key#x, val#x]
Arguments: [key#x ASC NULLS FIRST], false, 0
-(4) SortAggregate
+(3) SortAggregate
Input [2]: [key#x, val#x]
Keys [1]: [key#x]
Functions [1]: [partial_min(val#x)]
Aggregate Attributes [1]: [min#x]
Results [2]: [key#x, min#x]
-(5) Exchange
+(4) Exchange
Input [2]: [key#x, min#x]
Arguments: hashpartitioning(key#x, 4), true, [id=#x]
-(6) Sort [codegen id : 2]
+(5) Sort
Input [2]: [key#x, min#x]
Arguments: [key#x ASC NULLS FIRST], false, 0
-(7) SortAggregate
+(6) SortAggregate
Input [2]: [key#x, min#x]
Keys [1]: [key#x]
Functions [1]: [min(val#x)]
Aggregate Attributes [1]: [min(val#x)#x]
Results [2]: [key#x, min(val#x)#x AS min(val)#x]
+
+(7) AdaptiveSparkPlan
+Output [2]: [key#x, min(val)#x]
+Arguments: isFinalPlan=false
-- !query
@@ -1053,3 +850,11 @@ DROP TABLE explain_temp3
struct<>
-- !query output
+
+
+-- !query
+DROP TABLE explain_temp4
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 06226f1..1a18d56 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
--- Number of queries: 22
+-- Number of queries: 23
-- !query
@@ -1053,3 +1053,11 @@ DROP TABLE explain_temp3
struct<>
-- !query output
+
+
+-- !query
+DROP TABLE explain_temp4
+-- !query schema
+struct<>
+-- !query output
+
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
index b204709..a1b6d71 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala
@@ -18,16 +18,15 @@
package org.apache.spark.sql
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
+import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite,
EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType
-class ExplainSuite extends QueryTest with SharedSparkSession with
DisableAdaptiveExecutionSuite {
- import testImplicits._
+trait ExplainSuiteHelper extends QueryTest with SharedSparkSession {
- private def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String =
{
+ protected def getNormalizedExplain(df: DataFrame, mode: ExplainMode): String
= {
val output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
df.explain(mode.name)
@@ -38,7 +37,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession
with DisableAdaptiv
/**
* Get the explain from a DataFrame and run the specified action on it.
*/
- private def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f:
String => Unit) = {
+ protected def withNormalizedExplain(df: DataFrame, mode: ExplainMode)(f:
String => Unit) = {
f(getNormalizedExplain(df, mode))
}
@@ -46,7 +45,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession
with DisableAdaptiv
* Get the explain by running the sql. The explain mode should be part of the
* sql text itself.
*/
- private def withNormalizedExplain(queryText: String)(f: String => Unit) = {
+ protected def withNormalizedExplain(queryText: String)(f: String => Unit) = {
val output = new java.io.ByteArrayOutputStream()
Console.withOut(output) {
sql(queryText).show(false)
@@ -58,7 +57,7 @@ class ExplainSuite extends QueryTest with SharedSparkSession
with DisableAdaptiv
/**
* Runs the plan and makes sure the plans contains all of the keywords.
*/
- private def checkKeywordsExistsInExplain(
+ protected def checkKeywordsExistsInExplain(
df: DataFrame, mode: ExplainMode, keywords: String*): Unit = {
withNormalizedExplain(df, mode) { normalizedOutput =>
for (key <- keywords) {
@@ -67,9 +66,13 @@ class ExplainSuite extends QueryTest with SharedSparkSession
with DisableAdaptiv
}
}
- private def checkKeywordsExistsInExplain(df: DataFrame, keywords: String*):
Unit = {
+ protected def checkKeywordsExistsInExplain(df: DataFrame, keywords:
String*): Unit = {
checkKeywordsExistsInExplain(df, ExtendedMode, keywords: _*)
}
+}
+
+class ExplainSuite extends ExplainSuiteHelper with
DisableAdaptiveExecutionSuite {
+ import testImplicits._
test("SPARK-23034 show rdd names in RDD scan nodes (Dataset)") {
val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") ::
Nil).setName("testRdd")
@@ -342,4 +345,57 @@ class ExplainSuite extends QueryTest with
SharedSparkSession with DisableAdaptiv
}
}
+class ExplainSuiteAE extends ExplainSuiteHelper with
EnableAdaptiveExecutionSuite {
+ import testImplicits._
+
+ test("Explain formatted") {
+ val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1")
+ val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")
+ val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"),
avg("v2"))
+ // trigger the final plan for AQE
+ testDf.collect()
+ // whitespace
+ val ws = " "
+ // == Physical Plan ==
+ // AdaptiveSparkPlan (14)
+ // +- * HashAggregate (13)
+ // +- CustomShuffleReader (12)
+ // +- ShuffleQueryStage (11)
+ // +- Exchange (10)
+ // +- * HashAggregate (9)
+ // +- * Project (8)
+ // +- * BroadcastHashJoin Inner BuildRight (7)
+ // :- * Project (2)
+ // : +- * LocalTableScan (1)
+ // +- BroadcastQueryStage (6)
+ // +- BroadcastExchange (5)
+ // +- * Project (4)
+ // +- * LocalTableScan (3)
+ checkKeywordsExistsInExplain(
+ testDf,
+ FormattedMode,
+ s"""
+ |(6) BroadcastQueryStage$ws
+ |Output [2]: [k#x, v2#x]
+ |Arguments: 0
+ |""".stripMargin,
+ s"""
+ |(11) ShuffleQueryStage$ws
+ |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
+ |Arguments: 1
+ |""".stripMargin,
+ s"""
+ |(12) CustomShuffleReader$ws
+ |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
+ |Arguments: coalesced
+ |""".stripMargin,
+ s"""
+ |(14) AdaptiveSparkPlan$ws
+ |Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
+ |Arguments: isFinalPlan=true
+ |""".stripMargin
+ )
+ }
+}
+
case class ExplainSingleData(id: Int)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 8225863..c6caffa 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -53,7 +53,7 @@ class AdaptiveQueryExecSuite
event match {
case SparkListenerSQLAdaptiveExecutionUpdate(_, _, sparkPlanInfo) =>
if (sparkPlanInfo.simpleString.startsWith(
- "AdaptiveSparkPlan(isFinalPlan=true)")) {
+ "AdaptiveSparkPlan isFinalPlan=true")) {
finalPlanCnt += 1
}
case _ => // ignore other events
@@ -64,14 +64,14 @@ class AdaptiveQueryExecSuite
val dfAdaptive = sql(query)
val planBefore = dfAdaptive.queryExecution.executedPlan
-
assert(planBefore.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=false)"))
+ assert(planBefore.toString.startsWith("AdaptiveSparkPlan
isFinalPlan=false"))
val result = dfAdaptive.collect()
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
val df = sql(query)
QueryTest.sameRows(result.toSeq, df.collect().toSeq)
}
val planAfter = dfAdaptive.queryExecution.executedPlan
-
assert(planAfter.toString.startsWith("AdaptiveSparkPlan(isFinalPlan=true)"))
+ assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
spark.sparkContext.listenerBus.waitUntilEmpty()
assert(finalPlanCnt == 1)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]