This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new d1f4a34 [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan d1f4a34 is described below commit d1f4a34e9c17f9521177a45c53139dc502bd11b4 Author: Takeshi Yamamuro <yamam...@apache.org> AuthorDate: Sun Oct 25 16:15:55 2020 -0700 [SPARK-33228][SQL] Don't uncache data when replacing a view having the same logical plan ### What changes were proposed in this pull request? SPARK-30494's updated the `CreateViewCommand` code to implicitly drop cache when replacing an existing view. But, this change drops cache even when replacing a view having the same logical plan. A sequence of queries to reproduce this as follows; ``` // Spark v2.4.6+ scala> val df = spark.range(1).selectExpr("id a", "id b") scala> df.cache() scala> df.explain() == Physical Plan == *(1) ColumnarToRow +- InMemoryTableScan [a#2L, b#3L] +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L] +- *(1) Range (0, 1, step=1, splits=4) scala> df.createOrReplaceTempView("t") scala> sql("select * from t").explain() == Physical Plan == *(1) ColumnarToRow +- InMemoryTableScan [a#2L, b#3L] +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L] +- *(1) Range (0, 1, step=1, splits=4) // If one re-runs the same query `df.createOrReplaceTempView("t")`, the cache's swept away scala> df.createOrReplaceTempView("t") scala> sql("select * from t").explain() == Physical Plan == *(1) Project [id#0L AS a#2L, id#0L AS b#3L] +- *(1) Range (0, 1, step=1, splits=4) // Until v2.4.6 scala> val df = spark.range(1).selectExpr("id a", "id b") scala> df.cache() scala> df.createOrReplaceTempView("t") scala> sql("select * from t").explain() 20/10/23 22:33:42 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException == Physical Plan == *(1) InMemoryTableScan [a#2L, b#3L] +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L] +- *(1) Range (0, 1, step=1, splits=4) scala> df.createOrReplaceTempView("t") scala> sql("select * from t").explain() == Physical Plan == *(1) InMemoryTableScan [a#2L, b#3L] +- InMemoryRelation [a#2L, b#3L], StorageLevel(disk, memory, deserialized, 1 replicas) +- *(1) Project [id#0L AS a#2L, id#0L AS b#3L] +- *(1) Range (0, 1, step=1, splits=4) ``` ### Why are the changes needed? bugfix. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added tests. Closes #30140 from maropu/FixBugInReplaceView. Authored-by: Takeshi Yamamuro <yamam...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../apache/spark/sql/execution/command/views.scala | 10 +++++---- .../org/apache/spark/sql/CachedTableSuite.scala | 24 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 5d9f2c3..9cb4003 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -143,17 +143,19 @@ case class CreateViewCommand( val catalog = sparkSession.sessionState.catalog if (viewType == LocalTempView) { - if (replace && catalog.getTempView(name.table).isDefined) { - logDebug(s"Try to uncache ${name.quotedString} before replacing.") + if (replace && catalog.getTempView(name.table).isDefined && + !catalog.getTempView(name.table).get.sameResult(child)) { + logInfo(s"Try to uncache ${name.quotedString} before replacing.") CommandUtils.uncacheTableOrView(sparkSession, name.quotedString) } val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) catalog.createTempView(name.table, aliasedPlan, overrideIfExists = replace) } else if (viewType == GlobalTempView) { - if (replace && catalog.getGlobalTempView(name.table).isDefined) { + if (replace && catalog.getGlobalTempView(name.table).isDefined && + !catalog.getGlobalTempView(name.table).get.sameResult(child)) { val db = sparkSession.sessionState.conf.getConf(StaticSQLConf.GLOBAL_TEMP_DATABASE) val globalTempView = TableIdentifier(name.table, Option(db)) - logDebug(s"Try to uncache ${globalTempView.quotedString} before replacing.") + logInfo(s"Try to uncache ${globalTempView.quotedString} before replacing.") CommandUtils.uncacheTableOrView(sparkSession, globalTempView.quotedString) } val aliasedPlan = aliasPlan(sparkSession, analyzedPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 5c8c857..28c0fa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -907,4 +907,28 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext assert(spark.sharedState.cacheManager.isEmpty) } } + + test("SPARK-33228: Don't uncache data when replacing an existing view having the same plan") { + withTempView("tempView") { + spark.catalog.clearCache() + val df = spark.range(1).selectExpr("id a", "id b") + df.cache() + assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) + df.createOrReplaceTempView("tempView") + assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) + df.createOrReplaceTempView("tempView") + assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) + } + + withTempView("tempGlobalTempView") { + spark.catalog.clearCache() + val df = spark.range(1).selectExpr("id a", "id b") + df.cache() + assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) + df.createOrReplaceGlobalTempView("tempGlobalTempView") + assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) + df.createOrReplaceGlobalTempView("tempGlobalTempView") + assert(spark.sharedState.cacheManager.lookupCachedData(df).isDefined) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org