This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e684720  [SPARK-33435][SQL][3.0] DSv2: REFRESH TABLE should invalidate 
caches referencing the table
e684720 is described below

commit e684720243d9b8c86f04b4b221ff86977c3bd171
Author: Chao Sun <[email protected]>
AuthorDate: Thu Nov 12 18:02:20 2020 -0800

    [SPARK-33435][SQL][3.0] DSv2: REFRESH TABLE should invalidate caches 
referencing the table
    
    ### What changes were proposed in this pull request?
    
    This is a backport for PR #30359.
    
    This changes `RefreshTableExec` in DSv2 to also invalidate caches with 
references to the target table to be refreshed. The change itself is similar to 
what's done in #30211. Note that though, since we currently don't support 
caching a DSv2 table directly, this doesn't add recache logic as in the DSv1 
impl. I marked it as a TODO for now.
    
    Note there is some conflicts in the backport: in branch-3.0 
`DataSourceV2Strategy` we don't have a `ResolvedTable` when analyzing 
`RefreshTable` so instead in `RefreshTableExec` this loads the table from the 
catalog if it exists, and the rest is the same.
    
    ### Why are the changes needed?
    
    Currently the behavior in DSv1 and DSv2 is inconsistent w.r.t refreshing 
table: in DSv1 we invalidate both metadata cache as well as all table caches 
that are related to the table, but in DSv2 we only do the former. This 
addresses the issue and make the behavior consistent.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now refreshing a v2 table also invalidate all the related caches.
    
    ### How was this patch tested?
    
    Added a new UT.
    
    Closes #30360 from sunchao/SPARK-33435-branch-3.0.
    
    Authored-by: Chao Sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../execution/datasources/v2/DataSourceV2Strategy.scala    |  2 +-
 .../sql/execution/datasources/v2/RefreshTableExec.scala    | 13 ++++++++++++-
 .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala  | 14 ++++++++++++++
 3 files changed, 27 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index cca80c0..877aea1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -130,7 +130,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       }
 
     case RefreshTable(catalog, ident) =>
-      RefreshTableExec(catalog, ident) :: Nil
+      RefreshTableExec(session, catalog, ident) :: Nil
 
     case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
       val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
index 2a19ff3..9717b9f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RefreshTableExec.scala
@@ -17,15 +17,26 @@
 
 package org.apache.spark.sql.execution.datasources.v2
 
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
 
 case class RefreshTableExec(
+    session: SparkSession,
     catalog: TableCatalog,
     ident: Identifier) extends V2CommandExec {
   override protected def run(): Seq[InternalRow] = {
     catalog.invalidateTable(ident)
+
+    if (catalog.tableExists(ident)) {
+      val table = catalog.loadTable(ident)
+      // invalidate all caches referencing the given table
+      // TODO(SPARK-33437): re-cache the table itself once we support caching 
a DSv2 table
+      val v2Relation = DataSourceV2Relation.create(table, Some(catalog), 
Some(ident))
+      session.sharedState.cacheManager.uncacheQuery(session, v2Relation, 
cascade = true)
+    }
+
     Seq.empty
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index ef5558a..df71a85 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1711,6 +1711,20 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-33435: REFRESH TABLE should invalidate all caches referencing 
the table") {
+    val tblName = "testcat.ns.t"
+    withTable(tblName) {
+      withTempView("t") {
+        sql(s"CREATE TABLE $tblName (id bigint) USING foo")
+        sql(s"CACHE TABLE t AS SELECT id FROM $tblName")
+
+        
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table("t")).isDefined)
+        sql(s"REFRESH TABLE $tblName")
+        
assert(spark.sharedState.cacheManager.lookupCachedData(spark.table("t")).isEmpty)
+      }
+    }
+  }
+
   test("REPLACE TABLE: v1 table") {
     val e = intercept[AnalysisException] {
       sql(s"CREATE OR REPLACE TABLE tbl (a int) USING 
${classOf[SimpleScanSource].getName}")


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to