This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new b54a94b0d79 [SPARK-43376][SQL] Improve reuse subquery with table cache
b54a94b0d79 is described below
commit b54a94b0d79cdeade724c3a0b683c7105c15f9b9
Author: ulysses-you <[email protected]>
AuthorDate: Sat May 6 11:53:53 2023 +0800
[SPARK-43376][SQL] Improve reuse subquery with table cache
### What changes were proposed in this pull request?
AQE can not reuse subquery if it is pushed into `InMemoryTableScan`. There
are two issues:
- `ReuseAdaptiveSubquery` can not support reuse subquery if two subquery
have the same exprId
- `InMemoryTableScan` miss apply `ReuseAdaptiveSubquery` when wrap
`TableCacheQueryStageExec`
For example:
```
Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
Seq(2).toDF("c2").createOrReplaceTempView("t2")
spark.sql("SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
```
There are two `subquery#27` but have no `ReusedSubquery`
```
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
*(1) Filter (c1#14 < Subquery subquery#27, [id=#20])
: +- Subquery subquery#27, [id=#20]
: +- AdaptiveSparkPlan isFinalPlan=true
: +- LocalTableScan [c2#25]
+- TableCacheQueryStage 0
+- InMemoryTableScan [c1#14], [(c1#14 < Subquery subquery#27,
[id=#20])]
:- InMemoryRelation [c1#14], StorageLevel(disk, memory,
deserialized, 1 replicas)
: +- LocalTableScan [c1#14]
+- Subquery subquery#27, [id=#20]
+- AdaptiveSparkPlan isFinalPlan=true
+- LocalTableScan [c2#25]
```
### Why are the changes needed?
Improve the coverage of reuse subquery.
Note that, it is not a real perf issue because the subquery has been
already reused (the same Java object). This pr just makes the plan clearer
about subquery reuse.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
add test
Closes #41046 from ulysses-you/aqe-subquery.
Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
---
.../execution/adaptive/AdaptiveSparkPlanExec.scala | 6 ++-
.../adaptive/InsertAdaptiveSparkPlan.scala | 2 +-
.../execution/adaptive/ReuseAdaptiveSubquery.scala | 15 +++++---
.../org/apache/spark/sql/CachedTableSuite.scala | 44 ++++++++++++++--------
.../scala/org/apache/spark/sql/SubquerySuite.scala | 24 +++---------
.../adaptive/AdaptiveQueryExecSuite.scala | 15 ++++++++
6 files changed, 64 insertions(+), 42 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index fceb9db4112..1b2e802ae93 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -587,8 +587,10 @@ case class AdaptiveSparkPlanExec(
BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
}
case i: InMemoryTableScanExec =>
- // No need to optimize `InMemoryTableScanExec` as it's a leaf node.
- TableCacheQueryStageExec(currentStageId, i)
+ // Apply `queryStageOptimizerRules` so that we can reuse subquery.
+ // No need to apply `postStageCreationRules` for
`InMemoryTableScanExec`
+ // as it's a leaf node.
+ TableCacheQueryStageExec(currentStageId, optimizeQueryStage(i,
isFinalStage = false))
}
currentStageId += 1
setLogicalLinkForNewQueryStage(queryStage, plan)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 947a7314142..1f05adc57a4 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -125,7 +125,7 @@ case class InsertAdaptiveSparkPlan(
/**
* Returns an expression-id-to-execution-plan map for all the sub-queries.
* For each sub-query, generate the adaptive execution plan for each
sub-query by applying this
- * rule, or reuse the execution plan from another sub-query of the same
semantics if possible.
+ * rule.
*/
private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] =
{
val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec]
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
index c1d0e93e3b9..df684944721 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
@@ -33,11 +33,16 @@ case class ReuseAdaptiveSubquery(
plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
case sub: ExecSubqueryExpression =>
- val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized,
sub.plan)
- if (newPlan.ne(sub.plan)) {
- sub.withNewPlan(ReusedSubqueryExec(newPlan))
- } else {
- sub
+ // The subquery can be already reused (the same Java object) due to
filter pushdown
+ // of table cache. If it happens, we just need to wrap the current
subquery with
+ // `ReusedSubqueryExec` and no need to update the `reuseMap`.
+ reuseMap.get(sub.plan.canonicalized).map { subquery =>
+ sub.withNewPlan(ReusedSubqueryExec(subquery))
+ }.getOrElse {
+ reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) match {
+ case Some(subquery) =>
sub.withNewPlan(ReusedSubqueryExec(subquery))
+ case None => sub
+ }
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 5548108b915..1f2235a10a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -36,7 +36,7 @@ import
org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join,
JoinStrategyHint, SHUFFLE_HASH}
import org.apache.spark.sql.catalyst.util.DateTimeConstants
import org.apache.spark.sql.execution.{ColumnarToRowExec,
ExecSubqueryExpression, RDDScanExec, SparkPlan}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper,
AQEPropagateEmptyRelation}
import org.apache.spark.sql.execution.columnar._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.functions._
@@ -823,21 +823,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
test("SPARK-19993 subquery with cached underlying relation") {
withTempView("t1") {
- Seq(1).toDF("c1").createOrReplaceTempView("t1")
- spark.catalog.cacheTable("t1")
-
- // underlying table t1 is cached as well as the query that refers to it.
- val sqlText =
- """
- |SELECT * FROM t1
- |WHERE
- |NOT EXISTS (SELECT * FROM t1)
- """.stripMargin
- val ds = sql(sqlText)
- assert(getNumInMemoryRelations(ds) == 2)
-
- val cachedDs = sql(sqlText).cache()
-
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.sparkPlan) == 3)
+ Seq(false, true).foreach { enabled =>
+ withSQLConf(
+ SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key ->
enabled.toString,
+ SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
+ AQEPropagateEmptyRelation.ruleName) {
+
+ Seq(1).toDF("c1").createOrReplaceTempView("t1")
+ spark.catalog.cacheTable("t1")
+
+ // underlying table t1 is cached as well as the query that refers to
it.
+ val sqlText =
+ """
+ |SELECT * FROM t1
+ |WHERE
+ |NOT EXISTS (SELECT * FROM t1)
+ """.stripMargin
+ val ds = sql(sqlText)
+ assert(getNumInMemoryRelations(ds) == 2)
+
+ val cachedDs = sql(sqlText).cache()
+ cachedDs.collect()
+
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.executedPlan) ==
3)
+
+ cachedDs.unpersist()
+ spark.catalog.uncacheTable("t1")
+ }
+ }
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 32d913ca3b4..2425854e3c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2337,15 +2337,9 @@ class SubquerySuite extends QueryTest
case rs: ReusedSubqueryExec => rs.child.id
}
- if (enableAQE) {
- assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in
the plan")
- assert(reusedSubqueryIds.size == 4,
- "Missing or unexpected reused ReusedSubqueryExec in the plan")
- } else {
- assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in
the plan")
- assert(reusedSubqueryIds.size == 5,
- "Missing or unexpected reused ReusedSubqueryExec in the plan")
- }
+ assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in
the plan")
+ assert(reusedSubqueryIds.size == 5,
+ "Missing or unexpected reused ReusedSubqueryExec in the plan")
}
}
}
@@ -2413,15 +2407,9 @@ class SubquerySuite extends QueryTest
case rs: ReusedSubqueryExec => rs.child.id
}
- if (enableAQE) {
- assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in
the plan")
- assert(reusedSubqueryIds.size == 3,
- "Missing or unexpected reused ReusedSubqueryExec in the plan")
- } else {
- assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in
the plan")
- assert(reusedSubqueryIds.size == 4,
- "Missing or unexpected reused ReusedSubqueryExec in the plan")
- }
+ assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in
the plan")
+ assert(reusedSubqueryIds.size == 4,
+ "Missing or unexpected reused ReusedSubqueryExec in the plan")
}
}
}
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 7d0879c21d5..58936f5d8dc 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2826,6 +2826,21 @@ class AdaptiveQueryExecSuite
.executedPlan.isInstanceOf[LocalTableScanExec])
}
}
+
+ test("SPARK-43376: Improve reuse subquery with table cache") {
+ withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key ->
"true") {
+ withTable("t1", "t2") {
+ withCache("t1") {
+ Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
+ Seq(2).toDF("c2").createOrReplaceTempView("t2")
+
+ val (_, adaptive) = runAdaptiveAndVerifyResult(
+ "SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
+ assert(findReusedSubquery(adaptive).size == 1)
+ }
+ }
+ }
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]