This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 80716d1 [SPARK-33228][SQL] Don't uncache data when replacing a view
having the same logical plan
80716d1 is described below
commit 80716d1a216da12bba0ceeab8c11895d37d5559b
Author: Takeshi Yamamuro <[email protected]>
AuthorDate: Sun Oct 25 16:15:55 2020 -0700
[SPARK-33228][SQL] Don't uncache data when replacing a view having the same
logical plan
### What changes were proposed in this pull request?
SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache
when replacing an existing view. But, this change drops cache even when
replacing a view having the same logical plan. A sequence of queries to
reproduce this as follows;
```
// Spark v2.4.6+
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory,
deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) ColumnarToRow
+- InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory,
deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
// If one re-runs the same query `df.createOrReplaceTempView("t")`, the
cache's swept away
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
// Until v2.4.6
scala> val df = spark.range(1).selectExpr("id a", "id b")
scala> df.cache()
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp,
returning NoSuchObjectException
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory,
deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
scala> df.createOrReplaceTempView("t")
scala> sql("select * from t").explain()
== Physical Plan ==
*(1) InMemoryTableScan [a#2L, b#3L]
+- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory,
deserialized, 1 replicas)
+- *(1) Project [id#0L AS a#2L, id#0L AS b#3L]
+- *(1) Range (0, 1, step=1, splits=4)
```
### Why are the changes needed?
bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests.
Closes #30140 from maropu/FixBugInReplaceView.
Authored-by: Takeshi Yamamuro <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 87b498462b82fce02dd50286887092cf7858d2e8)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../apache/spark/sql/execution/command/views.scala | 10 +++++----
.../org/apache/spark/sql/CachedTableSuite.scala | 24 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 23f1d6c..0ba76ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -110,17 +110,19 @@ case class CreateViewCommand(
verifyTemporaryObjectsNotExists(catalog)
if (viewType == LocalTempView) {
- if (replace && catalog.getTempView(name.table).isDefined) {
- logDebug(s"Try to uncache ${name.quotedString} before replacing.")
+ if (replace && catalog.getTempView(name.table).isDefined &&
+ !catalog.getTempView(name.table).get.sameResult(child)) {
+ logInfo(s"Try to uncache ${name.quotedString} before replacing.")
CommandUtils.uncacheTableOrView(sparkSession, name.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
catalog.createTempView(name.table, aliasedPlan, overrideIfExists =
replace)
} else if (viewType == GlobalTempView) {
- if (replace && catalog.getGlobalTempView(name.table).isDefined) {
+ if (replace && catalog.getGlobalTempView(name.table).isDefined &&
+ !catalog.getGlobalTempView(name.table).get.sameResult(child)) {
val db =
sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE)
val globalTempView = TableIdentifier(name.table, Option(db))
- logDebug(s"Try to uncache ${globalTempView.quotedString} before
replacing.")
+ logInfo(s"Try to uncache ${globalTempView.quotedString} before
replacing.")
CommandUtils.uncacheTableOrView(sparkSession,
globalTempView.quotedString)
}
val aliasedPlan = aliasPlan(sparkSession, analyzedPlan)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 20f2a7f..adc725e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -1184,4 +1184,28 @@ class CachedTableSuite extends QueryTest with
SQLTestUtils
assert(spark.sharedState.cacheManager.isEmpty)
}
}
+
+ test("SPARK-33228: Don't uncache data when replacing an existing view having
the same plan") {
+ withTempView("tempView") {
+ spark.catalog.clearCache()
+ val df = spark.range(1).selectExpr("id a", "id b")
+ df.cache()
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceTempView("tempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceTempView("tempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ }
+
+ withTempView("tempGlobalTempView") {
+ spark.catalog.clearCache()
+ val df = spark.range(1).selectExpr("id a", "id b")
+ df.cache()
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceGlobalTempView("tempGlobalTempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ df.createOrReplaceGlobalTempView("tempGlobalTempView")
+ assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]