Repository: spark
Updated Branches:
  refs/heads/master 58f5361ca -> 34b97a067


[SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy 
caching

Although lazy caching for in-memory table seems consistent with the 
`RDD.cache()` API, it's relatively confusing for users who mainly work with SQL 
and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM 
t;` pattern is also commonly seen just to ensure predictable performance.

This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the 
`SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY 
TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching.

Also, took the chance to make some refactoring: `CacheCommand` and 
`CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` 
since the former is strictly a special case of the latter. A new 
`UncacheTableCommand` is added for the `UNCACHE TABLE t` statement.

Author: Cheng Lian <lian.cs....@gmail.com>

Closes #2513 from liancheng/eager-caching and squashes the following commits:

fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for 
lazy caching


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34b97a06
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34b97a06
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34b97a06

Branch: refs/heads/master
Commit: 34b97a067d1b370fbed8ecafab2f48501a35d783
Parents: 58f5361
Author: Cheng Lian <lian.cs....@gmail.com>
Authored: Sun Oct 5 17:51:59 2014 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Sun Oct 5 17:51:59 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |  45 +++---
 .../spark/sql/catalyst/analysis/Catalog.scala   |   2 +-
 .../sql/catalyst/plans/logical/commands.scala   |  15 +-
 .../org/apache/spark/sql/CacheManager.scala     |   9 +-
 .../columnar/InMemoryColumnarTableScan.scala    |   2 +-
 .../spark/sql/execution/SparkStrategies.scala   |   8 +-
 .../apache/spark/sql/execution/commands.scala   |  47 +++---
 .../org/apache/spark/sql/CachedTableSuite.scala | 145 +++++++++++++------
 .../spark/sql/hive/ExtendedHiveQlParser.scala   |  66 ++++-----
 .../org/apache/spark/sql/hive/TestHive.scala    |   6 +-
 .../spark/sql/hive/CachedTableSuite.scala       |  78 +++++++---
 11 files changed, 265 insertions(+), 158 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 2633633..854b5b4 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -67,11 +67,12 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected implicit def asParser(k: Keyword): Parser[String] =
     lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
 
+  protected val ABS = Keyword("ABS")
   protected val ALL = Keyword("ALL")
   protected val AND = Keyword("AND")
+  protected val APPROXIMATE = Keyword("APPROXIMATE")
   protected val AS = Keyword("AS")
   protected val ASC = Keyword("ASC")
-  protected val APPROXIMATE = Keyword("APPROXIMATE")
   protected val AVG = Keyword("AVG")
   protected val BETWEEN = Keyword("BETWEEN")
   protected val BY = Keyword("BY")
@@ -80,9 +81,9 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected val COUNT = Keyword("COUNT")
   protected val DESC = Keyword("DESC")
   protected val DISTINCT = Keyword("DISTINCT")
+  protected val EXCEPT = Keyword("EXCEPT")
   protected val FALSE = Keyword("FALSE")
   protected val FIRST = Keyword("FIRST")
-  protected val LAST = Keyword("LAST")
   protected val FROM = Keyword("FROM")
   protected val FULL = Keyword("FULL")
   protected val GROUP = Keyword("GROUP")
@@ -91,42 +92,42 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected val IN = Keyword("IN")
   protected val INNER = Keyword("INNER")
   protected val INSERT = Keyword("INSERT")
+  protected val INTERSECT = Keyword("INTERSECT")
   protected val INTO = Keyword("INTO")
   protected val IS = Keyword("IS")
   protected val JOIN = Keyword("JOIN")
+  protected val LAST = Keyword("LAST")
+  protected val LAZY = Keyword("LAZY")
   protected val LEFT = Keyword("LEFT")
+  protected val LIKE = Keyword("LIKE")
   protected val LIMIT = Keyword("LIMIT")
+  protected val LOWER = Keyword("LOWER")
   protected val MAX = Keyword("MAX")
   protected val MIN = Keyword("MIN")
   protected val NOT = Keyword("NOT")
   protected val NULL = Keyword("NULL")
   protected val ON = Keyword("ON")
   protected val OR = Keyword("OR")
-  protected val OVERWRITE = Keyword("OVERWRITE")
-  protected val LIKE = Keyword("LIKE")
-  protected val RLIKE = Keyword("RLIKE")
-  protected val UPPER = Keyword("UPPER")
-  protected val LOWER = Keyword("LOWER")
-  protected val REGEXP = Keyword("REGEXP")
   protected val ORDER = Keyword("ORDER")
   protected val OUTER = Keyword("OUTER")
+  protected val OVERWRITE = Keyword("OVERWRITE")
+  protected val REGEXP = Keyword("REGEXP")
   protected val RIGHT = Keyword("RIGHT")
+  protected val RLIKE = Keyword("RLIKE")
   protected val SELECT = Keyword("SELECT")
   protected val SEMI = Keyword("SEMI")
+  protected val SQRT = Keyword("SQRT")
   protected val STRING = Keyword("STRING")
+  protected val SUBSTR = Keyword("SUBSTR")
+  protected val SUBSTRING = Keyword("SUBSTRING")
   protected val SUM = Keyword("SUM")
   protected val TABLE = Keyword("TABLE")
   protected val TIMESTAMP = Keyword("TIMESTAMP")
   protected val TRUE = Keyword("TRUE")
   protected val UNCACHE = Keyword("UNCACHE")
   protected val UNION = Keyword("UNION")
+  protected val UPPER = Keyword("UPPER")
   protected val WHERE = Keyword("WHERE")
-  protected val INTERSECT = Keyword("INTERSECT")
-  protected val EXCEPT = Keyword("EXCEPT")
-  protected val SUBSTR = Keyword("SUBSTR")
-  protected val SUBSTRING = Keyword("SUBSTRING")
-  protected val SQRT = Keyword("SQRT")
-  protected val ABS = Keyword("ABS")
 
   // Use reflection to find the reserved words defined in this class.
   protected val reservedWords =
@@ -183,17 +184,15 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
     }
 
   protected lazy val cache: Parser[LogicalPlan] =
-    CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ {
-      case tableName ~ None => 
-        CacheCommand(tableName, true)
-      case tableName ~ Some(plan) =>
-        CacheTableAsSelectCommand(tableName, plan)
+    CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> select) <~ opt(";") ^^ {
+      case isLazy ~ tableName ~ plan =>
+        CacheTableCommand(tableName, plan, isLazy.isDefined)
     }
-    
+
   protected lazy val unCache: Parser[LogicalPlan] =
     UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
-      case tableName => CacheCommand(tableName, false)
-    }    
+      case tableName => UncacheTableCommand(tableName)
+    }
 
   protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, 
",")
 
@@ -283,7 +282,7 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
     termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
GreaterThanOrEqual(e1, e2) } |
     termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
Not(EqualTo(e1, e2)) } |
     termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => 
Not(EqualTo(e1, e2)) } |
-    termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { 
+    termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ {
       case e ~ _ ~ el ~ _  ~ eu => And(GreaterThanOrEqual(e, el), 
LessThanOrEqual(e, eu))
     } |
     termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, 
e2) } |

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 616f1e2..2059a91 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -87,7 +87,7 @@ class SimpleCatalog(val caseSensitive: Boolean) extends 
Catalog {
       tableName: String,
       alias: Option[String] = None): LogicalPlan = {
     val (dbName, tblName) = processDatabaseAndTableName(databaseName, 
tableName)
-    val table = tables.get(tblName).getOrElse(sys.error(s"Table Not Found: 
$tableName"))
+    val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: 
$tableName"))
     val tableWithQualifiers = Subquery(tblName, table)
 
     // If an alias was specified by the lookup, wrap the plan in a subquery so 
that attributes are

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 8366639..9a3848c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -56,9 +56,15 @@ case class ExplainCommand(plan: LogicalPlan, extended: 
Boolean = false) extends
 }
 
 /**
- * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" 
command.
+ * Returned for the "CACHE TABLE tableName [AS SELECT ...]" command.
  */
-case class CacheCommand(tableName: String, doCache: Boolean) extends Command
+case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], 
isLazy: Boolean)
+  extends Command
+
+/**
+ * Returned for the "UNCACHE TABLE tableName" command.
+ */
+case class UncacheTableCommand(tableName: String) extends Command
 
 /**
  * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
@@ -75,8 +81,3 @@ case class DescribeCommand(
     AttributeReference("data_type", StringType, nullable = false)(),
     AttributeReference("comment", StringType, nullable = false)())
 }
-
-/**
- * Returned for the "CACHE TABLE tableName AS SELECT .." command.
- */
-case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) 
extends Command

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index aebdbb6..3bf7382 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,14 +91,10 @@ private[sql] trait CacheManager {
   }
 
   /** Removes the data for the given SchemaRDD from the cache */
-  private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): 
Unit = writeLock {
+  private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): 
Unit = writeLock {
     val planToCache = query.queryExecution.optimizedPlan
     val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
-
-    if (dataIndex < 0) {
-      throw new IllegalArgumentException(s"Table $query is not cached.")
-    }
-
+    require(dataIndex >= 0, s"Table $query is not cached.")
     
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
     cachedData.remove(dataIndex)
   }
@@ -135,5 +131,4 @@ private[sql] trait CacheManager {
       case _ =>
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index cec82a7..4f79173 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation(
 
   override def newInstance() = {
     new InMemoryRelation(
-      output.map(_.newInstance),
+      output.map(_.newInstance()),
       useCompression,
       batchSize,
       storageLevel,

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cf93d5a..5c16d0c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         Seq(execution.SetCommand(key, value, plan.output)(context))
       case logical.ExplainCommand(logicalPlan, extended) =>
         Seq(execution.ExplainCommand(logicalPlan, plan.output, 
extended)(context))
-      case logical.CacheCommand(tableName, cache) =>
-        Seq(execution.CacheCommand(tableName, cache)(context))
-      case logical.CacheTableAsSelectCommand(tableName, plan) =>
-        Seq(execution.CacheTableAsSelectCommand(tableName, plan))
+      case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
+        Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+      case logical.UncacheTableCommand(tableName) =>
+        Seq(execution.UncacheTableCommand(tableName))
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index f88099e..d49633c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -138,49 +138,54 @@ case class ExplainCommand(
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class CacheCommand(tableName: String, doCache: Boolean)(@transient 
context: SQLContext)
+case class CacheTableCommand(
+    tableName: String,
+    plan: Option[LogicalPlan],
+    isLazy: Boolean)
   extends LeafNode with Command {
 
   override protected lazy val sideEffectResult = {
-    if (doCache) {
-      context.cacheTable(tableName)
-    } else {
-      context.uncacheTable(tableName)
+    import sqlContext._
+
+    plan.foreach(_.registerTempTable(tableName))
+    val schemaRDD = table(tableName)
+    schemaRDD.cache()
+
+    if (!isLazy) {
+      // Performs eager caching
+      schemaRDD.count()
     }
+
     Seq.empty[Row]
   }
 
   override def output: Seq[Attribute] = Seq.empty
 }
 
+
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
-    @transient context: SQLContext)
-  extends LeafNode with Command {
-
+case class UncacheTableCommand(tableName: String) extends LeafNode with 
Command {
   override protected lazy val sideEffectResult: Seq[Row] = {
-    Row("# Registered as a temporary table", null, null) +:
-      child.output.map(field => Row(field.name, field.dataType.toString, null))
+    sqlContext.table(tableName).unpersist()
+    Seq.empty[Row]
   }
+
+  override def output: Seq[Attribute] = Seq.empty
 }
 
 /**
  * :: DeveloperApi ::
  */
 @DeveloperApi
-case class CacheTableAsSelectCommand(tableName: String, logicalPlan: 
LogicalPlan)
+case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
+    @transient context: SQLContext)
   extends LeafNode with Command {
-  
-  override protected[sql] lazy val sideEffectResult = {
-    import sqlContext._
-    logicalPlan.registerTempTable(tableName)
-    cacheTable(tableName) 
-    Seq.empty[Row]
-  }
 
-  override def output: Seq[Attribute] = Seq.empty  
-  
+  override protected lazy val sideEffectResult: Seq[Row] = {
+    Row("# Registered as a temporary table", null, null) +:
+      child.output.map(field => Row(field.name, field.dataType.toString, null))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 957388e..1e624f9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,30 +18,39 @@
 package org.apache.spark.sql
 
 import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.columnar.{InMemoryRelation, 
InMemoryColumnarTableScan}
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, 
InMemoryRelation}
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.storage.RDDBlockId
 
 case class BigData(s: String)
 
 class CachedTableSuite extends QueryTest {
-  import TestSQLContext._
   TestData // Load test tables.
 
-  /**
-   * Throws a test failed exception when the number of cached tables differs 
from the expected
-   * number.
-   */
   def assertCached(query: SchemaRDD, numCachedTables: Int = 1): Unit = {
     val planWithCaching = query.queryExecution.withCachedData
     val cachedData = planWithCaching collect {
       case cached: InMemoryRelation => cached
     }
 
-    if (cachedData.size != numCachedTables) {
-      fail(
-        s"Expected query to contain $numCachedTables, but it actually had 
${cachedData.size}\n" +
+    assert(
+      cachedData.size == numCachedTables,
+      s"Expected query to contain $numCachedTables, but it actually had 
${cachedData.size}\n" +
         planWithCaching)
-    }
+  }
+
+  def rddIdOf(tableName: String): Int = {
+    val executedPlan = table(tableName).queryExecution.executedPlan
+    executedPlan.collect {
+      case InMemoryColumnarTableScan(_, _, relation) =>
+        relation.cachedColumnBuffers.id
+      case _ =>
+        fail(s"Table $tableName is not cached\n" + executedPlan)
+    }.head
+  }
+
+  def isMaterialized(rddId: Int): Boolean = {
+    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
   }
 
   test("too big for memory") {
@@ -52,10 +61,33 @@ class CachedTableSuite extends QueryTest {
     uncacheTable("bigData")
   }
 
-  test("calling .cache() should use inmemory columnar caching") {
+  test("calling .cache() should use in-memory columnar caching") {
     table("testData").cache()
+    assertCached(table("testData"))
+  }
+
+  test("calling .unpersist() should drop in-memory columnar cache") {
+    table("testData").cache()
+    table("testData").count()
+    table("testData").unpersist(true)
+    assertCached(table("testData"), 0)
+  }
+
+  test("isCached") {
+    cacheTable("testData")
 
     assertCached(table("testData"))
+    assert(table("testData").queryExecution.withCachedData match {
+      case _: InMemoryRelation => true
+      case _ => false
+    })
+
+    uncacheTable("testData")
+    assert(!isCached("testData"))
+    assert(table("testData").queryExecution.withCachedData match {
+      case _: InMemoryRelation => false
+      case _ => true
+    })
   }
 
   test("SPARK-1669: cacheTable should be idempotent") {
@@ -64,32 +96,27 @@ class CachedTableSuite extends QueryTest {
     cacheTable("testData")
     assertCached(table("testData"))
 
-    cacheTable("testData")
-    table("testData").queryExecution.analyzed match {
-      case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) =>
-        fail("cacheTable is not idempotent")
+    assertResult(1, "InMemoryRelation not found, testData should have been 
cached") {
+      table("testData").queryExecution.withCachedData.collect {
+        case r: InMemoryRelation => r
+      }.size
+    }
 
-      case _ =>
+    cacheTable("testData")
+    assertResult(0, "Double InMemoryRelations found, cacheTable() is not 
idempotent") {
+      table("testData").queryExecution.withCachedData.collect {
+        case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => 
r
+      }.size
     }
   }
 
   test("read from cached table and uncache") {
     cacheTable("testData")
-
-    checkAnswer(
-      table("testData"),
-      testData.collect().toSeq
-    )
-
+    checkAnswer(table("testData"), testData.collect().toSeq)
     assertCached(table("testData"))
 
     uncacheTable("testData")
-
-    checkAnswer(
-      table("testData"),
-      testData.collect().toSeq
-    )
-
+    checkAnswer(table("testData"), testData.collect().toSeq)
     assertCached(table("testData"), 0)
   }
 
@@ -99,10 +126,12 @@ class CachedTableSuite extends QueryTest {
     }
   }
 
-  test("SELECT Star Cached Table") {
+  test("SELECT star from cached table") {
     sql("SELECT * FROM testData").registerTempTable("selectStar")
     cacheTable("selectStar")
-    sql("SELECT * FROM selectStar WHERE key = 1").collect()
+    checkAnswer(
+      sql("SELECT * FROM selectStar WHERE key = 1"),
+      Seq(Row(1, "1")))
     uncacheTable("selectStar")
   }
 
@@ -120,23 +149,57 @@ class CachedTableSuite extends QueryTest {
     sql("CACHE TABLE testData")
     assertCached(table("testData"))
 
-    assert(isCached("testData"), "Table 'testData' should be cached")
+    val rddId = rddIdOf("testData")
+    assert(
+      isMaterialized(rddId),
+      "Eagerly cached in-memory table should have already been materialized")
 
     sql("UNCACHE TABLE testData")
-    assertCached(table("testData"), 0)
     assert(!isCached("testData"), "Table 'testData' should not be cached")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
   }
-  
-  test("CACHE TABLE tableName AS SELECT Star Table") {
+
+  test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
     sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
-    sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
-    assert(isCached("testCacheTable"), "Table 'testCacheTable' should be 
cached")
+    assertCached(table("testCacheTable"))
+
+    val rddId = rddIdOf("testCacheTable")
+    assert(
+      isMaterialized(rddId),
+      "Eagerly cached in-memory table should have already been materialized")
+
     uncacheTable("testCacheTable")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
   }
-  
-  test("'CACHE TABLE tableName AS SELECT ..'") {
-    sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
-    assert(isCached("testCacheTable"), "Table 'testCacheTable' should be 
cached")
+
+  test("CACHE TABLE tableName AS SELECT ...") {
+    sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
+    assertCached(table("testCacheTable"))
+
+    val rddId = rddIdOf("testCacheTable")
+    assert(
+      isMaterialized(rddId),
+      "Eagerly cached in-memory table should have already been materialized")
+
     uncacheTable("testCacheTable")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
+  }
+
+  test("CACHE LAZY TABLE tableName") {
+    sql("CACHE LAZY TABLE testData")
+    assertCached(table("testData"))
+
+    val rddId = rddIdOf("testData")
+    assert(
+      !isMaterialized(rddId),
+      "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+    sql("SELECT COUNT(*) FROM testData").collect()
+    assert(
+      isMaterialized(rddId),
+      "Lazily cached in-memory table should have been materialized")
+
+    uncacheTable("testData")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
index e7e1cb9..c5844e9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.SqlLexical
 
 /**
- * A parser that recognizes all HiveQL constructs together with several Spark 
SQL specific 
+ * A parser that recognizes all HiveQL constructs together with several Spark 
SQL specific
  * extensions like CACHE TABLE and UNCACHE TABLE.
  */
-private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with 
PackratParsers {  
-  
+private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with 
PackratParsers {
+
   def apply(input: String): LogicalPlan = {
     // Special-case out set commands since the value fields can be
     // complex to handle without RegexParsers. Also this approach
@@ -54,16 +54,17 @@ private[hive] class ExtendedHiveQlParser extends 
StandardTokenParsers with Packr
 
   protected case class Keyword(str: String)
 
-  protected val CACHE = Keyword("CACHE")
-  protected val SET = Keyword("SET")
   protected val ADD = Keyword("ADD")
-  protected val JAR = Keyword("JAR")
-  protected val TABLE = Keyword("TABLE")
   protected val AS = Keyword("AS")
-  protected val UNCACHE = Keyword("UNCACHE")
-  protected val FILE = Keyword("FILE")
+  protected val CACHE = Keyword("CACHE")
   protected val DFS = Keyword("DFS")
+  protected val FILE = Keyword("FILE")
+  protected val JAR = Keyword("JAR")
+  protected val LAZY = Keyword("LAZY")
+  protected val SET = Keyword("SET")
   protected val SOURCE = Keyword("SOURCE")
+  protected val TABLE = Keyword("TABLE")
+  protected val UNCACHE = Keyword("UNCACHE")
 
   protected implicit def asParser(k: Keyword): Parser[String] =
     lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
@@ -79,57 +80,56 @@ private[hive] class ExtendedHiveQlParser extends 
StandardTokenParsers with Packr
 
   override val lexical = new SqlLexical(reservedWords)
 
-  protected lazy val query: Parser[LogicalPlan] = 
+  protected lazy val query: Parser[LogicalPlan] =
     cache | uncache | addJar | addFile | dfs | source | hiveQl
 
   protected lazy val hiveQl: Parser[LogicalPlan] =
-    remainingQuery ^^ {
-      case r => HiveQl.createPlan(r.trim())
+    restInput ^^ {
+      case statement => HiveQl.createPlan(statement.trim())
     }
 
-  /** It returns all remaining query */
-  protected lazy val remainingQuery: Parser[String] = new Parser[String] {
+  // Returns the whole input string
+  protected lazy val wholeInput: Parser[String] = new Parser[String] {
     def apply(in: Input) =
-      Success(
-        in.source.subSequence(in.offset, in.source.length).toString,
-        in.drop(in.source.length()))
+      Success(in.source.toString, in.drop(in.source.length()))
   }
 
-  /** It returns all query */
-  protected lazy val allQuery: Parser[String] = new Parser[String] {
+  // Returns the rest of the input string that are not parsed yet
+  protected lazy val restInput: Parser[String] = new Parser[String] {
     def apply(in: Input) =
-      Success(in.source.toString, in.drop(in.source.length()))
+      Success(
+        in.source.subSequence(in.offset, in.source.length).toString,
+        in.drop(in.source.length()))
   }
 
   protected lazy val cache: Parser[LogicalPlan] =
-    CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ {
-      case tableName ~ None => CacheCommand(tableName, true)
-      case tableName ~ Some(plan) =>
-        CacheTableAsSelectCommand(tableName, plan)
+    CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> hiveQl) ^^ {
+      case isLazy ~ tableName ~ plan =>
+        CacheTableCommand(tableName, plan, isLazy.isDefined)
     }
 
   protected lazy val uncache: Parser[LogicalPlan] =
     UNCACHE ~ TABLE ~> ident ^^ {
-      case tableName => CacheCommand(tableName, false)
+      case tableName => UncacheTableCommand(tableName)
     }
 
   protected lazy val addJar: Parser[LogicalPlan] =
-    ADD ~ JAR ~> remainingQuery ^^ {
-      case rq => AddJar(rq.trim())
+    ADD ~ JAR ~> restInput ^^ {
+      case jar => AddJar(jar.trim())
     }
 
   protected lazy val addFile: Parser[LogicalPlan] =
-    ADD ~ FILE ~> remainingQuery ^^ {
-      case rq => AddFile(rq.trim())
+    ADD ~ FILE ~> restInput ^^ {
+      case file => AddFile(file.trim())
     }
 
   protected lazy val dfs: Parser[LogicalPlan] =
-    DFS ~> allQuery ^^ {
-      case aq => NativeCommand(aq.trim())
+    DFS ~> wholeInput ^^ {
+      case command => NativeCommand(command.trim())
     }
 
   protected lazy val source: Parser[LogicalPlan] =
-    SOURCE ~> remainingQuery ^^ {
-      case rq => SourceCommand(rq.trim())
+    SOURCE ~> restInput ^^ {
+      case file => SourceCommand(file.trim())
     }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index c0e6939..a4354c1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, 
NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, 
LogicalPlan, NativeCommand}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.SQLConf
@@ -67,7 +67,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
   lazy val metastorePath = 
getTempFilePath("sparkHiveMetastore").getCanonicalPath
 
   /** Sets up the system initially or after a RESET command */
-  protected def configure() {
+  protected def configure(): Unit = {
     setConf("javax.jdo.option.ConnectionURL",
       s"jdbc:derby:;databaseName=$metastorePath;create=true")
     setConf("hive.metastore.warehouse.dir", warehousePath)
@@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends 
HiveContext(sc) {
     override lazy val analyzed = {
       val describedTables = logical match {
         case NativeCommand(describedTable(tbl)) => tbl :: Nil
-        case CacheCommand(tbl, _) => tbl :: Nil
+        case CacheTableCommand(tbl, _, _) => tbl :: Nil
         case _ => Nil
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/34b97a06/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 158cfb5..2060e1f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,13 +17,13 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.{QueryTest, SchemaRDD}
-import org.apache.spark.sql.columnar.{InMemoryRelation, 
InMemoryColumnarTableScan}
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, 
InMemoryRelation}
 import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.{QueryTest, SchemaRDD}
+import org.apache.spark.storage.RDDBlockId
 
 class CachedTableSuite extends QueryTest {
-  import TestHive._
-
   /**
    * Throws a test failed exception when the number of cached tables differs 
from the expected
    * number.
@@ -34,11 +34,24 @@ class CachedTableSuite extends QueryTest {
       case cached: InMemoryRelation => cached
     }
 
-    if (cachedData.size != numCachedTables) {
-      fail(
-        s"Expected query to contain $numCachedTables, but it actually had 
${cachedData.size}\n" +
-          planWithCaching)
-    }
+    assert(
+      cachedData.size == numCachedTables,
+      s"Expected query to contain $numCachedTables, but it actually had 
${cachedData.size}\n" +
+        planWithCaching)
+  }
+
+  def rddIdOf(tableName: String): Int = {
+    val executedPlan = table(tableName).queryExecution.executedPlan
+    executedPlan.collect {
+      case InMemoryColumnarTableScan(_, _, relation) =>
+        relation.cachedColumnBuffers.id
+      case _ =>
+        fail(s"Table $tableName is not cached\n" + executedPlan)
+    }.head
+  }
+
+  def isMaterialized(rddId: Int): Boolean = {
+    sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
   }
 
   test("cache table") {
@@ -102,16 +115,47 @@ class CachedTableSuite extends QueryTest {
     assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
   }
 
-  test("CACHE TABLE AS SELECT") {
-    assertCached(sql("SELECT * FROM src"), 0)
-    sql("CACHE TABLE test AS SELECT key FROM src")
+  test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
+    sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
+    assertCached(table("testCacheTable"))
 
-    checkAnswer(
-      sql("SELECT * FROM test"),
-      sql("SELECT key FROM src").collect().toSeq)
+    val rddId = rddIdOf("testCacheTable")
+    assert(
+      isMaterialized(rddId),
+      "Eagerly cached in-memory table should have already been materialized")
 
-    assertCached(sql("SELECT * FROM test"))
+    uncacheTable("testCacheTable")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
+  }
+
+  test("CACHE TABLE tableName AS SELECT ...") {
+    sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
+    assertCached(table("testCacheTable"))
+
+    val rddId = rddIdOf("testCacheTable")
+    assert(
+      isMaterialized(rddId),
+      "Eagerly cached in-memory table should have already been materialized")
+
+    uncacheTable("testCacheTable")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
+  }
 
-    assertCached(sql("SELECT * FROM test JOIN test"), 2)
+  test("CACHE LAZY TABLE tableName") {
+    sql("CACHE LAZY TABLE src")
+    assertCached(table("src"))
+
+    val rddId = rddIdOf("src")
+    assert(
+      !isMaterialized(rddId),
+      "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+    sql("SELECT COUNT(*) FROM src").collect()
+    assert(
+      isMaterialized(rddId),
+      "Lazily cached in-memory table should have been materialized")
+
+    uncacheTable("src")
+    assert(!isMaterialized(rddId), "Uncached in-memory table should have been 
unpersisted")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to