Repository: spark Updated Branches: refs/heads/master 06232d23f -> 9081b9f9f
[SPARK-2189][SQL] Adds dropTempTable API This PR adds an API for unregistering temporary tables. If a temporary table has been cached before, it's unpersisted as well. Author: Cheng Lian <[email protected]> Closes #3039 from liancheng/unregister-temp-table and squashes the following commits: 54ae99f [Cheng Lian] Fixes Scala styling issue 1948c14 [Cheng Lian] Removes the unpersist argument aca41d3 [Cheng Lian] Ensures thread safety 7d4fb2b [Cheng Lian] Adds unregisterTempTable API Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9081b9f9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9081b9f9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9081b9f9 Branch: refs/heads/master Commit: 9081b9f9f79b78f0b20a5fc3bc4e7c1d3e717130 Parents: 06232d2 Author: Cheng Lian <[email protected]> Authored: Sun Nov 2 16:00:24 2014 -0800 Committer: Michael Armbrust <[email protected]> Committed: Sun Nov 2 16:00:24 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/CacheManager.scala | 13 +++++++++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 13 +++++++++++++ .../org/apache/spark/sql/CachedTableSuite.scala | 20 ++++++++++++++++++++ 3 files changed, 46 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9081b9f9/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index 3ced11a..2e7abac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -103,6 +103,19 @@ private[sql] trait CacheManager { cachedData.remove(dataIndex) } + /** Tries to remove the data for the given SchemaRDD from the cache if it's cached */ + private[sql] def tryUncacheQuery( + query: SchemaRDD, + blocking: Boolean = true): Boolean = writeLock { + val planToCache = query.queryExecution.analyzed + val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) + val found = dataIndex >= 0 + if (found) { + cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) + cachedData.remove(dataIndex) + } + found + } /** Optionally returns cached data for the given SchemaRDD */ private[sql] def lookupCachedData(query: SchemaRDD): Option[CachedData] = readLock { http://git-wip-us.apache.org/repos/asf/spark/blob/9081b9f9/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 4cded98..3cf6af5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -277,6 +277,19 @@ class SQLContext(@transient val sparkContext: SparkContext) } /** + * Drops the temporary table with the given table name in the catalog. If the table has been + * cached/persisted before, it's also unpersisted. + * + * @param tableName the name of the table to be unregistered. + * + * @group userf + */ + def dropTempTable(tableName: String): Unit = { + tryUncacheQuery(table(tableName)) + catalog.unregisterTable(None, tableName) + } + + /** * Executes a SQL query using Spark, returning the result as a SchemaRDD. The dialect that is * used for SQL parsing can be configured with 'spark.sql.dialect'. * http://git-wip-us.apache.org/repos/asf/spark/blob/9081b9f9/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 44a2961..765fa82 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -231,4 +231,24 @@ class CachedTableSuite extends QueryTest { assert(cached.statistics.sizeInBytes === actualSizeInBytes) } } + + test("Drops temporary table") { + testData.select('key).registerTempTable("t1") + table("t1") + dropTempTable("t1") + assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found")) + } + + test("Drops cached temporary table") { + testData.select('key).registerTempTable("t1") + testData.select('key).registerTempTable("t2") + cacheTable("t1") + + assert(isCached("t1")) + assert(isCached("t2")) + + dropTempTable("t1") + assert(intercept[RuntimeException](table("t1")).getMessage.startsWith("Table Not Found")) + assert(!isCached("t2")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
