This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 97e65b9 [SPARK-33652][SQL] DSv2: DeleteFrom should refresh cache
97e65b9 is described below
commit 97e65b9733d0f30e45b58ef10bd4e39859c9877c
Author: Chao Sun <[email protected]>
AuthorDate: Sun Dec 6 01:14:22 2020 -0800
[SPARK-33652][SQL] DSv2: DeleteFrom should refresh cache
### What changes were proposed in this pull request?
This changes `DeleteFromTableExec` to also refresh caches referencing the
original table, by passing the `refreshCache` callback to the class. Note that
in order to construct the callback, I have to change `DataSourceV2ScanRelation`
to contain a `DataSourceV2Relation` instead of a `Table`.
### Why are the changes needed?
Currently DSv2 delete from table doesn't refresh caches. This could lead to
correctness issue if the staled cache is queried later.
### Does this PR introduce _any_ user-facing change?
Yes. Now delete from table in v2 also refreshes cache.
### How was this patch tested?
Added a test case.
Closes #30597 from sunchao/SPARK-33652.
Authored-by: Chao Sun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit e857e06452c2cf478beb31367f76d6950b660ebb)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../execution/datasources/v2/DataSourceV2Relation.scala | 6 +++---
.../src/main/scala/org/apache/spark/sql/Dataset.scala | 4 ++--
.../execution/datasources/v2/DataSourceV2Strategy.scala | 5 +++--
.../execution/datasources/v2/DeleteFromTableExec.scala | 4 +++-
.../datasources/v2/V2ScanRelationPushDown.scala | 2 +-
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 16 ++++++++++++++++
6 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 4debdd3..513fce0 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -111,16 +111,16 @@ case class DataSourceV2Relation(
* plan. This ensures that the stats that are used by the optimizer account
for the filters and
* projection that will be pushed down.
*
- * @param table a DSv2 [[Table]]
+ * @param relation a [[DataSourceV2Relation]]
* @param scan a DSv2 [[Scan]]
* @param output the output attributes of this relation
*/
case class DataSourceV2ScanRelation(
- table: Table,
+ relation: DataSourceV2Relation,
scan: Scan,
output: Seq[AttributeReference]) extends LeafNode with NamedRelation {
- override def name: String = table.name()
+ override def name: String = relation.table.name()
override def simpleString(maxFields: Int): String = {
s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 0716043..05d6647 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -53,7 +53,7 @@ import
org.apache.spark.sql.execution.aggregate.TypedAggregateExpression
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter,
ArrowConverters}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2ScanRelation,
FileTable}
+import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation, FileTable}
import org.apache.spark.sql.execution.python.EvaluatePython
import org.apache.spark.sql.execution.stat.StatFunctions
import org.apache.spark.sql.internal.SQLConf
@@ -3464,7 +3464,7 @@ class Dataset[T] private[sql](
fr.inputFiles
case r: HiveTableRelation =>
r.tableMeta.storage.locationUri.map(_.toString).toArray
- case DataSourceV2ScanRelation(table: FileTable, _, _) =>
+ case DataSourceV2ScanRelation(DataSourceV2Relation(table: FileTable, _,
_, _, _), _, _) =>
table.fileIndex.inputFiles
}.flatten
files.toSet.toArray
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 938ba77..5289d35 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -208,7 +208,8 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
case DeleteFromTable(relation, condition) =>
relation match {
- case DataSourceV2ScanRelation(table, _, output) =>
+ case DataSourceV2ScanRelation(r, _, output) =>
+ val table = r.table
if (condition.exists(SubqueryExpression.hasSubquery)) {
throw new AnalysisException(
s"Delete by condition with subquery is not supported:
$condition")
@@ -227,7 +228,7 @@ class DataSourceV2Strategy(session: SparkSession) extends
Strategy with Predicat
s"Cannot delete from table ${table.name} where
${filters.mkString("[", ", ", "]")}")
}
- DeleteFromTableExec(table.asDeletable, filters) :: Nil
+ DeleteFromTableExec(table.asDeletable, filters, refreshCache(r)) ::
Nil
case _ =>
throw new AnalysisException("DELETE is only supported with v2
tables.")
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
index afebbfd..f0a45c2 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DeleteFromTableExec.scala
@@ -24,10 +24,12 @@ import org.apache.spark.sql.sources.Filter
case class DeleteFromTableExec(
table: SupportsDelete,
- condition: Array[Filter]) extends V2CommandExec {
+ condition: Array[Filter],
+ refreshCache: () => Unit) extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
table.deleteWhere(condition)
+ refreshCache()
Seq.empty
}
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
index b168e84..d218056 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
@@ -64,7 +64,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {
case _ => scan
}
- val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan,
output)
+ val scanRelation = DataSourceV2ScanRelation(relation, wrappedScan,
output)
val projectionOverSchema = ProjectionOverSchema(output.toStructType)
val projectionFunc = (expr: Expression) => expr transformDown {
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 6ef4fd1..6838a76 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1841,6 +1841,22 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-33652: DeleteFrom should refresh caches referencing the table") {
+ val t = "testcat.ns1.ns2.tbl"
+ val view = "view"
+ withTable(t) {
+ withTempView(view) {
+ sql(s"CREATE TABLE $t (id bigint, data string, p int) USING foo
PARTITIONED BY (id, p)")
+ sql(s"INSERT INTO $t VALUES (2L, 'a', 2), (2L, 'b', 3), (3L, 'c', 3)")
+ sql(s"CACHE TABLE view AS SELECT id FROM $t")
+ assert(spark.table(view).count() == 3)
+
+ sql(s"DELETE FROM $t WHERE id = 2")
+ assert(spark.table(view).count() == 1)
+ }
+ }
+ }
+
test("UPDATE TABLE") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]