This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new b54a94b0d79 [SPARK-43376][SQL] Improve reuse subquery with table cache
b54a94b0d79 is described below

commit b54a94b0d79cdeade724c3a0b683c7105c15f9b9
Author: ulysses-you <[email protected]>
AuthorDate: Sat May 6 11:53:53 2023 +0800

    [SPARK-43376][SQL] Improve reuse subquery with table cache
    
    ### What changes were proposed in this pull request?
    
    AQE can not reuse subquery if it is pushed into `InMemoryTableScan`. There 
are two issues:
    - `ReuseAdaptiveSubquery` can not support reuse subquery if two subquery 
have the same exprId
    -  `InMemoryTableScan` miss apply `ReuseAdaptiveSubquery` when wrap 
`TableCacheQueryStageExec`
    
    For example:
    ```
    Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
    Seq(2).toDF("c2").createOrReplaceTempView("t2")
    spark.sql("SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
    ```
    There are two `subquery#27` but have no `ReusedSubquery`
    
    ```
    AdaptiveSparkPlan isFinalPlan=true
    +- == Final Plan ==
       *(1) Filter (c1#14 < Subquery subquery#27, [id=#20])
       :  +- Subquery subquery#27, [id=#20]
       :     +- AdaptiveSparkPlan isFinalPlan=true
       :        +- LocalTableScan [c2#25]
       +- TableCacheQueryStage 0
          +- InMemoryTableScan [c1#14], [(c1#14 < Subquery subquery#27, 
[id=#20])]
                :- InMemoryRelation [c1#14], StorageLevel(disk, memory, 
deserialized, 1 replicas)
                :     +- LocalTableScan [c1#14]
                +- Subquery subquery#27, [id=#20]
                   +- AdaptiveSparkPlan isFinalPlan=true
                      +- LocalTableScan [c2#25]
    ```
    
    ### Why are the changes needed?
    
    Improve the coverage of reuse subquery.
    
    Note that, it is not a real perf issue because the subquery has been 
already reused (the same Java object). This pr just makes the plan clearer 
about subquery reuse.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    add test
    
    Closes #41046 from ulysses-you/aqe-subquery.
    
    Authored-by: ulysses-you <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  6 ++-
 .../adaptive/InsertAdaptiveSparkPlan.scala         |  2 +-
 .../execution/adaptive/ReuseAdaptiveSubquery.scala | 15 +++++---
 .../org/apache/spark/sql/CachedTableSuite.scala    | 44 ++++++++++++++--------
 .../scala/org/apache/spark/sql/SubquerySuite.scala | 24 +++---------
 .../adaptive/AdaptiveQueryExecSuite.scala          | 15 ++++++++
 6 files changed, 64 insertions(+), 42 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index fceb9db4112..1b2e802ae93 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -587,8 +587,10 @@ case class AdaptiveSparkPlanExec(
           BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
         }
       case i: InMemoryTableScanExec =>
-        // No need to optimize `InMemoryTableScanExec` as it's a leaf node.
-        TableCacheQueryStageExec(currentStageId, i)
+        // Apply `queryStageOptimizerRules` so that we can reuse subquery.
+        // No need to apply `postStageCreationRules` for 
`InMemoryTableScanExec`
+        // as it's a leaf node.
+        TableCacheQueryStageExec(currentStageId, optimizeQueryStage(i, 
isFinalStage = false))
     }
     currentStageId += 1
     setLogicalLinkForNewQueryStage(queryStage, plan)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
index 947a7314142..1f05adc57a4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala
@@ -125,7 +125,7 @@ case class InsertAdaptiveSparkPlan(
   /**
    * Returns an expression-id-to-execution-plan map for all the sub-queries.
    * For each sub-query, generate the adaptive execution plan for each 
sub-query by applying this
-   * rule, or reuse the execution plan from another sub-query of the same 
semantics if possible.
+   * rule.
    */
   private def buildSubqueryMap(plan: SparkPlan): Map[Long, BaseSubqueryExec] = 
{
     val subqueryMap = mutable.HashMap.empty[Long, BaseSubqueryExec]
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
index c1d0e93e3b9..df684944721 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReuseAdaptiveSubquery.scala
@@ -33,11 +33,16 @@ case class ReuseAdaptiveSubquery(
 
     
plan.transformAllExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) {
       case sub: ExecSubqueryExpression =>
-        val newPlan = reuseMap.getOrElseUpdate(sub.plan.canonicalized, 
sub.plan)
-        if (newPlan.ne(sub.plan)) {
-          sub.withNewPlan(ReusedSubqueryExec(newPlan))
-        } else {
-          sub
+        // The subquery can be already reused (the same Java object) due to 
filter pushdown
+        // of table cache. If it happens, we just need to wrap the current 
subquery with
+        // `ReusedSubqueryExec` and no need to update the `reuseMap`.
+        reuseMap.get(sub.plan.canonicalized).map { subquery =>
+          sub.withNewPlan(ReusedSubqueryExec(subquery))
+        }.getOrElse {
+          reuseMap.putIfAbsent(sub.plan.canonicalized, sub.plan) match {
+            case Some(subquery) => 
sub.withNewPlan(ReusedSubqueryExec(subquery))
+            case None => sub
+          }
         }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 5548108b915..1f2235a10a9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -36,7 +36,7 @@ import 
org.apache.spark.sql.catalyst.expressions.SubqueryExpression
 import org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Join, 
JoinStrategyHint, SHUFFLE_HASH}
 import org.apache.spark.sql.catalyst.util.DateTimeConstants
 import org.apache.spark.sql.execution.{ColumnarToRowExec, 
ExecSubqueryExpression, RDDScanExec, SparkPlan}
-import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, 
AQEPropagateEmptyRelation}
 import org.apache.spark.sql.execution.columnar._
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.functions._
@@ -823,21 +823,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
 
   test("SPARK-19993 subquery with cached underlying relation") {
     withTempView("t1") {
-      Seq(1).toDF("c1").createOrReplaceTempView("t1")
-      spark.catalog.cacheTable("t1")
-
-      // underlying table t1 is cached as well as the query that refers to it.
-      val sqlText =
-        """
-          |SELECT * FROM t1
-          |WHERE
-          |NOT EXISTS (SELECT * FROM t1)
-        """.stripMargin
-      val ds = sql(sqlText)
-      assert(getNumInMemoryRelations(ds) == 2)
-
-      val cachedDs = sql(sqlText).cache()
-      
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.sparkPlan) == 3)
+      Seq(false, true).foreach { enabled =>
+        withSQLConf(
+          SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> 
enabled.toString,
+          SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key ->
+            AQEPropagateEmptyRelation.ruleName) {
+
+          Seq(1).toDF("c1").createOrReplaceTempView("t1")
+          spark.catalog.cacheTable("t1")
+
+          // underlying table t1 is cached as well as the query that refers to 
it.
+          val sqlText =
+            """
+              |SELECT * FROM t1
+              |WHERE
+              |NOT EXISTS (SELECT * FROM t1)
+            """.stripMargin
+          val ds = sql(sqlText)
+          assert(getNumInMemoryRelations(ds) == 2)
+
+          val cachedDs = sql(sqlText).cache()
+          cachedDs.collect()
+          
assert(getNumInMemoryTablesRecursively(cachedDs.queryExecution.executedPlan) == 
3)
+
+          cachedDs.unpersist()
+          spark.catalog.uncacheTable("t1")
+        }
+      }
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 32d913ca3b4..2425854e3c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -2337,15 +2337,9 @@ class SubquerySuite extends QueryTest
           case rs: ReusedSubqueryExec => rs.child.id
         }
 
-        if (enableAQE) {
-          assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in 
the plan")
-          assert(reusedSubqueryIds.size == 4,
-            "Missing or unexpected reused ReusedSubqueryExec in the plan")
-        } else {
-          assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in 
the plan")
-          assert(reusedSubqueryIds.size == 5,
-            "Missing or unexpected reused ReusedSubqueryExec in the plan")
-        }
+        assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in 
the plan")
+        assert(reusedSubqueryIds.size == 5,
+          "Missing or unexpected reused ReusedSubqueryExec in the plan")
       }
     }
   }
@@ -2413,15 +2407,9 @@ class SubquerySuite extends QueryTest
           case rs: ReusedSubqueryExec => rs.child.id
         }
 
-        if (enableAQE) {
-          assert(subqueryIds.size == 3, "Missing or unexpected SubqueryExec in 
the plan")
-          assert(reusedSubqueryIds.size == 3,
-            "Missing or unexpected reused ReusedSubqueryExec in the plan")
-        } else {
-          assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in 
the plan")
-          assert(reusedSubqueryIds.size == 4,
-            "Missing or unexpected reused ReusedSubqueryExec in the plan")
-        }
+        assert(subqueryIds.size == 2, "Missing or unexpected SubqueryExec in 
the plan")
+        assert(reusedSubqueryIds.size == 4,
+          "Missing or unexpected reused ReusedSubqueryExec in the plan")
       }
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 7d0879c21d5..58936f5d8dc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -2826,6 +2826,21 @@ class AdaptiveQueryExecSuite
         .executedPlan.isInstanceOf[LocalTableScanExec])
     }
   }
+
+  test("SPARK-43376: Improve reuse subquery with table cache") {
+    withSQLConf(SQLConf.CAN_CHANGE_CACHED_PLAN_OUTPUT_PARTITIONING.key -> 
"true") {
+      withTable("t1", "t2") {
+        withCache("t1") {
+          Seq(1).toDF("c1").cache().createOrReplaceTempView("t1")
+          Seq(2).toDF("c2").createOrReplaceTempView("t2")
+
+          val (_, adaptive) = runAdaptiveAndVerifyResult(
+            "SELECT * FROM t1 WHERE c1 < (SELECT c2 FROM t2)")
+          assert(findReusedSubquery(adaptive).size == 1)
+        }
+      }
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to