This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 97e65b9  [SPARK-33652][SQL] DSv2: DeleteFrom should refresh cache
97e65b9 is described below

commit 97e65b9733d0f30e45b58ef10bd4e39859c9877c
Author: Chao Sun <[email protected]>
AuthorDate: Sun Dec 6 01:14:22 2020 -0800

    [SPARK-33652][SQL] DSv2: DeleteFrom should refresh cache
    
    ### What changes were proposed in this pull request?
    
    This changes `DeleteFromTableExec` to also refresh caches referencing the 
original table, by passing the `refreshCache` callback to the class. Note that 
in order to construct the callback, I have to change `DataSourceV2ScanRelation` 
to contain a `DataSourceV2Relation` instead of a `Table`.
    
    ### Why are the changes needed?
    
    Currently DSv2 delete from table doesn't refresh caches. This could lead to 
correctness issue if the staled cache is queried later.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Now delete from table in v2 also refreshes cache.
    
    ### How was this patch tested?
    
    Added a test case.
    
    Closes #30597 from sunchao/SPARK-33652.
    
    Authored-by: Chao Sun <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit e857e06452c2cf478beb31367f76d6950b660ebb)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../execution/datasources/v2/DataSourceV2Relation.scala  |  6 +++---
 .../src/main/scala/org/apache/spark/sql/Dataset.scala    |  4 ++--
 .../execution/datasources/v2/DataSourceV2Strategy.scala  |  5 +++--
 .../execution/datasources/v2/DeleteFromTableExec.scala   |  4 +++-
 .../datasources/v2/V2ScanRelationPushDown.scala          |  2 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala       | 16 ++++++++++++++++
 6 files changed, 28 insertions(+), 9 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 4debdd3..513fce0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -111,16 +111,16 @@ case class DataSourceV2Relation(
  * plan. This ensures that the stats that are used by the optimizer account 
for the filters and
  * projection that will be pushed down.
  *
- * @param table a DSv2 [[Table]]
+ * @param relation a [[DataSourceV2Relation]]
  * @param scan a DSv2 [[Scan]]
  * @param output the output attributes of this relation
  */
 case class DataSourceV2ScanRelation(
-    table: Table,
+    relation: DataSourceV2Relation,
     scan: Scan,
     output: Seq[AttributeReference]) extends LeafNode with NamedRelation {
 
-  override def name: String = table.name()
+  override def name: String = relation.table.name()
 
   override def simpleString(maxFields: Int): String = {
     s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0716043..05d6647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -53,7 +53,7 @@ import 
org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
 import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, 
ArrowConverters}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation, 
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation, FileTable}
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.internal.SQLConf
@@ -3464,7 +3464,7 @@ class Dataset[T] private[sql](
         fr.inputFiles
       case r: HiveTableRelation =>
         r.tableMeta.storage.locationUri.map(_.toString).toArray
-      case DataSourceV2ScanRelation(table: FileTable, _, _) =>
+      case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _, 
_, _, _), _, _) =>
         table.fileIndex.inputFiles
     }.flatten
     files.toSet.toArray
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 938ba77..5289d35 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -208,7 +208,8 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
 
     case DeleteFromTable(relation, condition) =>
       relation match {
-        case DataSourceV2ScanRelation(table, _, output) =>
+        case DataSourceV2ScanRelation(r, _, output) =>
+          val table = r.table
           if (condition.exists(SubqueryExpression.hasSubquery)) {
             throw new AnalysisException(
               s"Delete by condition with subquery is not supported: 
$condition")
@@ -227,7 +228,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
               s"Cannot delete from table ${table.name} where 
${filters.mkString("[", ", ", "]")}")
           }
 
-          DeleteFromTableExec(table.asDeletable, filters) :: Nil
+          DeleteFromTableExec(table.asDeletable, filters, refreshCache(r)) :: 
Nil
         case _ =>
           throw new AnalysisException("DELETE is only supported with v2 
tables.")
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
index afebbfd..f0a45c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
@@ -24,10 +24,12 @@ import org.apache.spark.sql.sources.Filter
 
 case class DeleteFromTableExec(
     table: SupportsDelete,
-    condition: Array[Filter]) extends V2CommandExec {
+    condition: Array[Filter],
+    refreshCache: () => Unit) extends V2CommandExec {
 
   override protected def run(): Seq[InternalRow] = {
     table.deleteWhere(condition)
+    refreshCache()
     Seq.empty
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index b168e84..d218056 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -64,7 +64,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
         case _ => scan
       }
 
-      val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, 
output)
+      val scanRelation = DataSourceV2ScanRelation(relation, wrappedScan, 
output)
 
       val projectionOverSchema = ProjectionOverSchema(output.toStructType)
       val projectionFunc = (expr: Expression) => expr transformDown {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6ef4fd1..6838a76 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1841,6 +1841,22 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-33652: DeleteFrom should refresh caches referencing the table") {
+    val t = "testcat.ns1.ns2.tbl"
+    val view = "view"
+    withTable(t) {
+      withTempView(view) {
+        sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo 
PARTITIONED BY (id, p)")
+        sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
+        sql(s"CACHE TABLE view AS SELECT id FROM $t")
+        assert(spark.table(view).count() == 3)
+
+        sql(s"DELETE FROM $t WHERE id = 2")
+        assert(spark.table(view).count() == 1)
+      }
+    }
+  }
+
   test("UPDATE TABLE") {
     val t = "testcat.ns1.ns2.tbl"
     withTable(t) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to