This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 80716d1  [SPARK-33228][SQL] Don't uncache data when replacing a view 
having the same logical plan
80716d1 is described below

commit 80716d1a216da12bba0ceeab8c11895d37d5559b
Author: Takeshi Yamamuro <[email protected]>
AuthorDate: Sun Oct 25 16:15:55 2020 -0700

    [SPARK-33228][SQL] Don't uncache data when replacing a view having the same 
logical plan
    
    ### What changes were proposed in this pull request?
    
    SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache 
when replacing an existing view. But, this change drops cache even when 
replacing a view having the same logical plan. A sequence of queries to 
reproduce this as follows;
    ```
    // Spark v2.4.6+
    scala> val df = spark.range(1).selectExpr("id a", "id b")
    scala> df.cache()
    scala> df.explain()
    == Physical Plan ==
    *(1) ColumnarToRow
    +- InMemoryTableScan [a#2L, b#3L]
          +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
                +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                   +- *(1) Range (0, 1, step=1, splits=4)
    
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    == Physical Plan ==
    *(1) ColumnarToRow
    +- InMemoryTableScan [a#2L, b#3L]
          +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
                +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                   +- *(1) Range (0, 1, step=1, splits=4)
    
    // If one re-runs the same query `df.createOrReplaceTempView("t")`, the 
cache's swept away
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    == Physical Plan ==
    *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
    +- *(1) Range (0, 1, step=1, splits=4)
    
    // Until v2.4.6
    scala> val df = spark.range(1).selectExpr("id a", "id b")
    scala> df.cache()
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp, 
returning NoSuchObjectException
    == Physical Plan ==
    *(1) InMemoryTableScan [a#2L, b#3L]
       +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
             +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                +- *(1) Range (0, 1, step=1, splits=4)
    
    scala> df.createOrReplaceTempView("t")
    scala> sql("select * from t").explain()
    == Physical Plan ==
    *(1) InMemoryTableScan [a#2L, b#3L]
       +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, 
deserialized, 1 replicas)
             +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
                +- *(1) Range (0, 1, step=1, splits=4)
    ```
    
    ### Why are the changes needed?
    
    bugfix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests.
    
    Closes #30140 from maropu/FixBugInReplaceView.
    
    Authored-by: Takeshi Yamamuro <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 87b498462b82fce02dd50286887092cf7858d2e8)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/execution/command/views.scala | 10 +++++----
 .../org/apache/spark/sql/CachedTableSuite.scala    | 24 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 23f1d6c..0ba76ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -110,17 +110,19 @@ case class CreateViewCommand(
     verifyTemporaryObjectsNotExists(catalog)
 
     if (viewType == LocalTempView) {
-      if (replace && catalog.getTempView(name.table).isDefined) {
-        logDebug(s"Try to uncache ${name.quotedString} before replacing.")
+      if (replace && catalog.getTempView(name.table).isDefined &&
+          !catalog.getTempView(name.table).get.sameResult(child)) {
+        logInfo(s"Try to uncache ${name.quotedString} before replacing.")
         CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
       }
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
       catalog.createTempView(name.table, aliasedPlan, overrideIfExists = 
replace)
     } else if (viewType == GlobalTempView) {
-      if (replace && catalog.getGlobalTempView(name.table).isDefined) {
+      if (replace && catalog.getGlobalTempView(name.table).isDefined &&
+          !catalog.getGlobalTempView(name.table).get.sameResult(child)) {
         val db = 
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
         val globalTempView = TableIdentifier(name.table, Option(db))
-        logDebug(s"Try to uncache ${globalTempView.quotedString} before 
replacing.")
+        logInfo(s"Try to uncache ${globalTempView.quotedString} before 
replacing.")
         CommandUtils.uncacheTableOrView(sparkSession, 
globalTempView.quotedString)
       }
       val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 20f2a7f..adc725e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1184,4 +1184,28 @@ class CachedTableSuite extends QueryTest with 
SQLTestUtils
       assert(spark.sharedState.cacheManager.isEmpty)
     }
   }
+
+  test("SPARK-33228: Don't uncache data when replacing an existing view having 
the same plan") {
+    withTempView("tempView") {
+      spark.catalog.clearCache()
+      val df = spark.range(1).selectExpr("id a", "id b")
+      df.cache()
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceTempView("tempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceTempView("tempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+    }
+
+    withTempView("tempGlobalTempView") {
+      spark.catalog.clearCache()
+      val df = spark.range(1).selectExpr("id a", "id b")
+      df.cache()
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceGlobalTempView("tempGlobalTempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+      df.createOrReplaceGlobalTempView("tempGlobalTempView")
+      assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to