This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a32178c  [SPARK-33290][SQL][2.4] REFRESH TABLE should invalidate cache 
even though the table itself may not be cached
a32178c is described below

commit a32178c61d9e8f5b95eb4b401bcc299dac62e236
Author: Chao Sun <[email protected]>
AuthorDate: Sat Oct 31 14:20:32 2020 -0700

    [SPARK-33290][SQL][2.4] REFRESH TABLE should invalidate cache even though 
the table itself may not be cached
    
    Backport #30187 for branch-2.4
    
    ### What changes were proposed in this pull request?
    
    In `CatalogImpl.refreshTable`, this moves the `uncacheQuery` call out of 
the condition `if (cache.nonEmpty)` so that it will be called whether the table 
itself is cached or not.
    
    ### Why are the changes needed?
    
    In the case like the following:
    ```sql
    CREATE TABLE t ...;
    CREATE VIEW t1 AS SELECT * FROM t;
    REFRESH TABLE t;
    ```
    
    If the table `t` is refreshed, the view `t1` which is depending on `t` will 
not be invalidated. This could lead to incorrect result and is similar to 
[SPARK-19765](https://issues.apache.org/jira/browse/SPARK-19765).
    
    On the other hand, if we have:
    
    ```sql
    CREATE TABLE t ...;
    CACHE TABLE t;
    CREATE VIEW t1 AS SELECT * FROM t;
    REFRESH TABLE t;
    ```
    
    Then the view `t1` will be refreshed. The behavior is somewhat inconsistent.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, with the change any cache that are depending on the table refreshed 
will be invalidated with the change. Previously this only happens if the table 
itself is cached.
    
    ### How was this patch tested?
    
    Added a new UT for the case.
    
    Closes #30215 from sunchao/backport-SPARK-33290-2.4.
    
    Authored-by: Chao Sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../apache/spark/sql/internal/CatalogImpl.scala    |  9 +++--
 .../org/apache/spark/sql/CachedTableSuite.scala    | 42 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 4698e8a..1a84b4d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -473,6 +473,9 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * If this table is cached as an InMemoryRelation, drop the original cached 
version and make the
    * new version cached lazily.
    *
+   * In addition, refreshing a table also invalidate all caches that have 
reference to the table
+   * in a cascading manner. This is to prevent incorrect result from the 
otherwise staled caches.
+   *
    * @group cachemgmt
    * @since 2.0.0
    */
@@ -492,9 +495,11 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
 
     // If this table is cached as an InMemoryRelation, drop the original
     // cached version and make the new version cached lazily.
+
+    // Uncache the logicalPlan. Note this is a no-op for the table itself if 
it's not cached, but
+    // will invalidate all caches referencing this table.
+    sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = true)
     if (isCached(table)) {
-      // Uncache the logicalPlan.
-      sparkSession.sharedState.cacheManager.uncacheQuery(table, cascade = 
true, blocking = true)
       // Cache it again.
       sparkSession.sharedState.cacheManager.cacheQuery(table, 
Some(tableIdent.table))
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 28c0fa4..34548f8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -931,4 +931,46 @@ class CachedTableSuite extends QueryTest with SQLTestUtils 
with SharedSQLContext
       assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
     }
   }
+
+  test("SPARK-33290: REFRESH TABLE should invalidate all caches referencing 
the table") {
+    withTable("t") {
+      withTempPath { path =>
+        withTempView("tempView1", "tempView2") {
+          Seq((1 -> "a")).toDF("i", "j").write.parquet(path.getCanonicalPath)
+          sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
+          sql("CREATE TEMPORARY VIEW tempView1 AS SELECT * FROM t")
+          sql("CACHE TABLE tempView2 AS SELECT i FROM tempView1")
+          checkAnswer(sql("SELECT * FROM tempView1"), Seq(Row(1, "a")))
+          checkAnswer(sql("SELECT * FROM tempView2"), Seq(Row(1)))
+
+          Utils.deleteRecursively(path)
+          sql("REFRESH TABLE tempView1")
+          checkAnswer(sql("SELECT * FROM tempView1"), Seq.empty)
+          checkAnswer(sql("SELECT * FROM tempView2"), Seq.empty)
+        }
+      }
+    }
+  }
+
+  test("SPARK-33290: querying temporary view after REFRESH TABLE fails with 
FNFE") {
+    withTable("t") {
+      withTempPath { path =>
+        withTempView("tempView1") {
+          Seq((1 -> "a")).toDF("i", "j").write.parquet(path.getCanonicalPath)
+          sql(s"CREATE TABLE t USING parquet LOCATION '${path.toURI}'")
+          sql("CREATE TEMPORARY VIEW tempView1 AS SELECT * FROM t")
+          checkAnswer(sql("SELECT * FROM tempView1"), Seq(Row(1, "a")))
+
+          Utils.deleteRecursively(path)
+          sql("REFRESH TABLE t")
+          checkAnswer(sql("SELECT * FROM t"), Seq.empty)
+          val exception = intercept[Exception] {
+            checkAnswer(sql("SELECT * FROM tempView1"), Seq.empty)
+          }
+          assert(exception.getMessage.contains("FileNotFoundException"))
+          assert(exception.getMessage.contains("REFRESH TABLE"))
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to