Repository: spark Updated Branches: refs/heads/master ac586bbb0 -> 9ad0f6ea8
[SPARK-25269][SQL] SQL interface support specify StorageLevel when cache table ## What changes were proposed in this pull request? SQL interface support specify `StorageLevel` when cache table. The semantic is: ```sql CACHE TABLE tableName OPTIONS('storageLevel' 'DISK_ONLY'); ``` All supported `StorageLevel` are: https://github.com/apache/spark/blob/eefdf9f9dd8afde49ad7d4e230e2735eb817ab0a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala#L172-L183 ## How was this patch tested? unit tests and manual tests. manual tests configuration: ``` --executor-memory 15G --executor-cores 5 --num-executors 50 ``` Data: Input Size / Records: 1037.7 GB / 11732805788 Result: ![image](https://user-images.githubusercontent.com/5399861/47213362-56a1c980-d3cd-11e8-82e7-28d7abc5923e.png) Closes #22263 from wangyum/SPARK-25269. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ad0f6ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ad0f6ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ad0f6ea Branch: refs/heads/master Commit: 9ad0f6ea89435391ec16e436bc4c4d5bf6b68493 Parents: ac586bb Author: Yuming Wang <yumw...@ebay.com> Authored: Fri Oct 19 09:15:55 2018 -0700 Committer: Dongjoon Hyun <dongj...@apache.org> Committed: Fri Oct 19 09:15:55 2018 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/catalyst/parser/SqlBase.g4 | 3 +- .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/cache.scala | 23 +++++++- .../org/apache/spark/sql/CachedTableSuite.scala | 60 ++++++++++++++++++++ .../apache/spark/sql/hive/test/TestHive.scala | 2 +- 5 files changed, 86 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 0569986..e2d34d1 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -162,7 +162,8 @@ statement tableIdentifier partitionSpec? describeColName? #describeTable | REFRESH TABLE tableIdentifier #refreshTable | REFRESH (STRING | .*?) #refreshResource - | CACHE LAZY? TABLE tableIdentifier (AS? query)? #cacheTable + | CACHE LAZY? TABLE tableIdentifier + (OPTIONS options=tablePropertyList)? (AS? query)? #cacheTable | UNCACHE TABLE (IF EXISTS)? tableIdentifier #uncacheTable | CLEAR CACHE #clearCache | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 4ed14d3..364efea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -282,7 +282,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { throw new ParseException(s"It is not allowed to add database prefix `$database` to " + s"the table name in CACHE TABLE AS SELECT", ctx) } - CacheTableCommand(tableIdent, query, ctx.LAZY != null) + val options = Option(ctx.options).map(visitPropertyKeyValues).getOrElse(Map.empty) + CacheTableCommand(tableIdent, query, ctx.LAZY != null, options) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 6b00426..728604a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -17,16 +17,21 @@ package org.apache.spark.sql.execution.command +import java.util.Locale + import org.apache.spark.sql.{Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.storage.StorageLevel case class CacheTableCommand( tableIdent: TableIdentifier, plan: Option[LogicalPlan], - isLazy: Boolean) extends RunnableCommand { + isLazy: Boolean, + options: Map[String, String]) extends RunnableCommand { require(plan.isEmpty || tableIdent.database.isEmpty, "Database name is not allowed in CACHE TABLE AS SELECT") @@ -36,7 +41,21 @@ case class CacheTableCommand( plan.foreach { logicalPlan => Dataset.ofRows(sparkSession, logicalPlan).createTempView(tableIdent.quotedString) } - sparkSession.catalog.cacheTable(tableIdent.quotedString) + + val storageLevelKey = "storagelevel" + val storageLevelValue = + CaseInsensitiveMap(options).get(storageLevelKey).map(_.toUpperCase(Locale.ROOT)) + val withoutStorageLevel = options.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) + if (withoutStorageLevel.nonEmpty) { + logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") + } + + if (storageLevelValue.nonEmpty) { + sparkSession.catalog.cacheTable( + tableIdent.quotedString, StorageLevel.fromString(storageLevelValue.get)) + } else { + sparkSession.catalog.cacheTable(tableIdent.quotedString) + } if (!isLazy) { // Performs eager caching http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 60c73df..6e805c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -22,6 +22,8 @@ import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.spark.CleanerListener +import org.apache.spark.executor.DataReadMethod._ +import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.expressions.SubqueryExpression @@ -64,6 +66,13 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext maybeBlock.nonEmpty } + def isExpectStorageLevel(rddId: Int, level: DataReadMethod): Boolean = { + val maybeBlock = sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)) + val isExpectLevel = maybeBlock.forall(_.readMethod === level) + maybeBlock.foreach(_ => sparkContext.env.blockManager.releaseLock(RDDBlockId(rddId, 0))) + maybeBlock.nonEmpty && isExpectLevel + } + private def getNumInMemoryRelations(ds: Dataset[_]): Int = { val plan = ds.queryExecution.withCachedData var sum = plan.collect { case _: InMemoryRelation => 1 }.sum @@ -288,6 +297,57 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext } } + private def assertStorageLevel(cacheOptions: String, level: DataReadMethod): Unit = { + sql(s"CACHE TABLE testData OPTIONS$cacheOptions") + assertCached(spark.table("testData")) + val rddId = rddIdOf("testData") + assert(isExpectStorageLevel(rddId, level)) + } + + test("SQL interface support storageLevel(DISK_ONLY)") { + assertStorageLevel("('storageLevel' 'DISK_ONLY')", Disk) + } + + test("SQL interface support storageLevel(DISK_ONLY) with invalid options") { + assertStorageLevel("('storageLevel' 'DISK_ONLY', 'a' '1', 'b' '2')", Disk) + } + + test("SQL interface support storageLevel(MEMORY_ONLY)") { + assertStorageLevel("('storageLevel' 'MEMORY_ONLY')", Memory) + } + + test("SQL interface cache SELECT ... support storageLevel(DISK_ONLY)") { + withTempView("testCacheSelect") { + sql("CACHE TABLE testCacheSelect OPTIONS('storageLevel' 'DISK_ONLY') SELECT * FROM testData") + assertCached(spark.table("testCacheSelect")) + val rddId = rddIdOf("testCacheSelect") + assert(isExpectStorageLevel(rddId, Disk)) + } + } + + test("SQL interface support storageLevel(Invalid StorageLevel)") { + val message = intercept[IllegalArgumentException] { + sql("CACHE TABLE testData OPTIONS('storageLevel' 'invalid_storage_level')") + }.getMessage + assert(message.contains("Invalid StorageLevel: INVALID_STORAGE_LEVEL")) + } + + test("SQL interface support storageLevel(with LAZY)") { + sql("CACHE LAZY TABLE testData OPTIONS('storageLevel' 'disk_only')") + assertCached(spark.table("testData")) + + val rddId = rddIdOf("testData") + assert( + !isMaterialized(rddId), + "Lazily cached in-memory table shouldn't be materialized eagerly") + + sql("SELECT COUNT(*) FROM testData").collect() + assert( + isMaterialized(rddId), + "Lazily cached in-memory table should have been materialized") + assert(isExpectStorageLevel(rddId, Disk)) + } + test("InMemoryRelation statistics") { sql("CACHE TABLE testData") spark.table("testData").queryExecution.withCachedData.collect { http://git-wip-us.apache.org/repos/asf/spark/blob/9ad0f6ea/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 71f15a4..634b3db 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -571,7 +571,7 @@ private[hive] class TestHiveQueryExecution( override lazy val analyzed: LogicalPlan = { val describedTables = logical match { - case CacheTableCommand(tbl, _, _) => tbl.table :: Nil + case CacheTableCommand(tbl, _, _, _) => tbl.table :: Nil case _ => Nil } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org