Repository: spark
Updated Branches:
  refs/heads/master 0402bd77e -> 0266a0c8a


[SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables

JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968)

This PR added support for SQL/HiveQL command for caching/uncaching tables:

```
scala> sql("CACHE TABLE src")
...
res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
CacheCommandPhysical src, true

scala> table("src")
...
res1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:98
== Query Plan ==
InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], 
(MetastoreRelation default, src, None), None), false

scala> isCached("src")
res2: Boolean = true

scala> sql("CACHE TABLE src")
...
res3: org.apache.spark.sql.SchemaRDD =
SchemaRDD[4] at RDD at SchemaRDD.scala:98
== Query Plan ==
CacheCommandPhysical src, false

scala> table("src")
...
res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[11] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None

scala> isCached("src")
res5: Boolean = false
```

Things also work for `hql`.

Author: Cheng Lian <[email protected]>

Closes #1038 from liancheng/sqlCacheTable and squashes the following commits:

ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands
6f4ce42 [Cheng Lian] Moved logical command classes to a separate file
3458a24 [Cheng Lian] Added comment for public API
f0ffacc [Cheng Lian] Added isCached() predicate
15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0266a0c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0266a0c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0266a0c8

Branch: refs/heads/master
Commit: 0266a0c8a70e0fbaeb0df63031f7a750ffc31a80
Parents: 0402bd7
Author: Cheng Lian <[email protected]>
Authored: Wed Jun 11 00:06:50 2014 -0700
Committer: Michael Armbrust <[email protected]>
Committed: Wed Jun 11 00:06:50 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   | 10 +++-
 .../catalyst/plans/logical/LogicalPlan.scala    | 35 +-----------
 .../sql/catalyst/plans/logical/commands.scala   | 60 ++++++++++++++++++++
 .../scala/org/apache/spark/sql/SQLContext.scala |  9 +++
 .../spark/sql/execution/SparkStrategies.scala   |  7 ++-
 .../apache/spark/sql/execution/commands.scala   | 23 ++++++++
 .../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++
 .../org/apache/spark/sql/hive/HiveQl.scala      | 18 +++---
 .../org/apache/spark/sql/hive/TestHive.scala    |  5 +-
 .../spark/sql/hive/CachedTableSuite.scala       | 16 ++++++
 10 files changed, 152 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 36758f3..46fcfbb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected val APPROXIMATE = Keyword("APPROXIMATE")
   protected val AVG = Keyword("AVG")
   protected val BY = Keyword("BY")
+  protected val CACHE = Keyword("CACHE")
   protected val CAST = Keyword("CAST")
   protected val COUNT = Keyword("COUNT")
   protected val DESC = Keyword("DESC")
@@ -149,7 +150,9 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
   protected val SEMI = Keyword("SEMI")
   protected val STRING = Keyword("STRING")
   protected val SUM = Keyword("SUM")
+  protected val TABLE = Keyword("TABLE")
   protected val TRUE = Keyword("TRUE")
+  protected val UNCACHE = Keyword("UNCACHE")
   protected val UNION = Keyword("UNION")
   protected val WHERE = Keyword("WHERE")
 
@@ -189,7 +192,7 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
         UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) 
} |
         UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => 
Distinct(Union(q1, q2)) }
       )
-    | insert
+    | insert | cache
   )
 
   protected lazy val select: Parser[LogicalPlan] =
@@ -220,6 +223,11 @@ class SqlParser extends StandardTokenParsers with 
PackratParsers {
         InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
     }
 
+  protected lazy val cache: Parser[LogicalPlan] =
+    (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
+      case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
+    }
+
   protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, 
",")
 
   protected lazy val projection: Parser[Expression] =

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 7eeb98a..0933a31 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.types.{StringType, StructType}
+import org.apache.spark.sql.catalyst.types.StructType
 import org.apache.spark.sql.catalyst.trees
 
 abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
@@ -97,39 +97,6 @@ abstract class LeafNode extends LogicalPlan with 
trees.LeafNode[LogicalPlan] {
 }
 
 /**
- * A logical node that represents a non-query command to be executed by the 
system.  For example,
- * commands can be used by parsers to represent DDL operations.
- */
-abstract class Command extends LeafNode {
-  self: Product =>
-  def output: Seq[Attribute] = Seq.empty  // TODO: SPARK-2081 should fix this
-}
-
-/**
- * Returned for commands supported by a given parser, but not catalyst.  In 
general these are DDL
- * commands that are passed directly to another system.
- */
-case class NativeCommand(cmd: String) extends Command
-
-/**
- * Commands of the form "SET (key) (= value)".
- */
-case class SetCommand(key: Option[String], value: Option[String]) extends 
Command {
-  override def output = Seq(
-    AttributeReference("key", StringType, nullable = false)(),
-    AttributeReference("value", StringType, nullable = false)()
-  )
-}
-
-/**
- * Returned by a parser when the users only wants to see what query plan would 
be executed, without
- * actually performing the execution.
- */
-case class ExplainCommand(plan: LogicalPlan) extends Command {
-  override def output = Seq(AttributeReference("plan", StringType, nullable = 
false)())
-}
-
-/**
  * A logical plan node with single child.
  */
 abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] 
{

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
new file mode 100644
index 0000000..d05c965
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Attribute}
+import org.apache.spark.sql.catalyst.types.StringType
+
+/**
+ * A logical node that represents a non-query command to be executed by the 
system.  For example,
+ * commands can be used by parsers to represent DDL operations.
+ */
+abstract class Command extends LeafNode {
+  self: Product =>
+  def output: Seq[Attribute] = Seq.empty  // TODO: SPARK-2081 should fix this
+}
+
+/**
+ * Returned for commands supported by a given parser, but not catalyst.  In 
general these are DDL
+ * commands that are passed directly to another system.
+ */
+case class NativeCommand(cmd: String) extends Command
+
+/**
+ * Commands of the form "SET (key) (= value)".
+ */
+case class SetCommand(key: Option[String], value: Option[String]) extends 
Command {
+  override def output = Seq(
+    AttributeReference("key", StringType, nullable = false)(),
+    AttributeReference("value", StringType, nullable = false)()
+  )
+}
+
+/**
+ * Returned by a parser when the users only wants to see what query plan would 
be executed, without
+ * actually performing the execution.
+ */
+case class ExplainCommand(plan: LogicalPlan) extends Command {
+  override def output = Seq(AttributeReference("plan", StringType, nullable = 
false)())
+}
+
+/**
+ * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" 
command.
+ */
+case class CacheCommand(tableName: String, doCache: Boolean) extends Command
+

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 021e0e8..264192e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -188,6 +188,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
   }
 
+  /** Returns true if the table is currently cached in-memory. */
+  def isCached(tableName: String): Boolean = {
+    val relation = catalog.lookupRelation(None, tableName)
+    EliminateAnalysisOperators(relation) match {
+      case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
+      case _ => false
+    }
+  }
+
   protected[sql] class SparkPlanner extends SparkStrategies {
     val sparkContext = self.sparkContext
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0455748..f2f95df 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.SetCommand(key, value) =>
         Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
       case logical.ExplainCommand(child) =>
-        val qe = context.executePlan(child)
-        Seq(execution.ExplainCommandPhysical(qe.executedPlan, 
plan.output)(context))
+        val executedPlan = context.executePlan(child).executedPlan
+        Seq(execution.ExplainCommandPhysical(executedPlan, 
plan.output)(context))
+      case logical.CacheCommand(tableName, cache) =>
+        Seq(execution.CacheCommandPhysical(tableName, cache)(context))
       case _ => Nil
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 9364506..be26d19 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: 
Seq[Attribute])
 
   override def otherCopyArgs = context :: Nil
 }
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class CacheCommandPhysical(tableName: String, doCache: 
Boolean)(@transient context: SQLContext)
+  extends LeafNode {
+
+  lazy val commandSideEffect = {
+    if (doCache) {
+      context.cacheTable(tableName)
+    } else {
+      context.uncacheTable(tableName)
+    }
+  }
+
+  override def execute(): RDD[Row] = {
+    commandSideEffect
+    context.emptyResult
+  }
+
+  override def output: Seq[Attribute] = Seq.empty
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0331f90..ebca3ad 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -70,4 +70,20 @@ class CachedTableSuite extends QueryTest {
     TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = 
b.key")
     TestSQLContext.uncacheTable("testData")
   }
+
+  test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
+    TestSQLContext.sql("CACHE TABLE testData")
+    TestSQLContext.table("testData").queryExecution.executedPlan match {
+      case _: InMemoryColumnarTableScan => // Found evidence of caching
+      case _ => fail(s"Table 'testData' should be cached")
+    }
+    assert(TestSQLContext.isCached("testData"), "Table 'testData' should be 
cached")
+
+    TestSQLContext.sql("UNCACHE TABLE testData")
+    TestSQLContext.table("testData").queryExecution.executedPlan match {
+      case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not 
be cached")
+      case _ => // Found evidence of uncaching
+    }
+    assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not 
be cached")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 4e74d9b..b745d8f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -218,15 +218,19 @@ private[hive] object HiveQl {
           case Array(key, value) => // "set key=value"
             SetCommand(Some(key), Some(value))
         }
-      } else if (sql.toLowerCase.startsWith("add jar")) {
+      } else if (sql.trim.toLowerCase.startsWith("cache table")) {
+        CacheCommand(sql.drop(12).trim, true)
+      } else if (sql.trim.toLowerCase.startsWith("uncache table")) {
+        CacheCommand(sql.drop(14).trim, false)
+      } else if (sql.trim.toLowerCase.startsWith("add jar")) {
         AddJar(sql.drop(8))
-      } else if (sql.toLowerCase.startsWith("add file")) {
+      } else if (sql.trim.toLowerCase.startsWith("add file")) {
         AddFile(sql.drop(9))
-      } else if (sql.startsWith("dfs")) {
+      } else if (sql.trim.startsWith("dfs")) {
         DfsCommand(sql)
-      } else if (sql.startsWith("source")) {
+      } else if (sql.trim.startsWith("source")) {
         SourceCommand(sql.split(" ").toSeq match { case Seq("source", 
filePath) => filePath })
-      } else if (sql.startsWith("!")) {
+      } else if (sql.trim.startsWith("!")) {
         ShellCommand(sql.drop(1))
       } else {
         val tree = getAst(sql)
@@ -839,11 +843,11 @@ private[hive] object HiveQl {
     case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => 
SumDistinct(nodeToExpr(arg))
     case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => 
Max(nodeToExpr(arg))
     case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => 
Min(nodeToExpr(arg))
-    
+
     /* System functions about string operations */
     case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => 
Upper(nodeToExpr(arg))
     case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => 
Lower(nodeToExpr(arg))
-    
+
     /* Casts */
     case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) =>
       Cast(nodeToExpr(arg), StringType)

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 041e813..9386008 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
 
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, 
NativeCommand}
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.hive._
 
@@ -103,7 +103,7 @@ class TestHiveContext(sc: SparkContext) extends 
LocalHiveContext(sc) {
   val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + 
File.separator + "hive")) {
     new File("src" + File.separator + "test" + File.separator + "resources" + 
File.separator)
   } else {
-    new File("sql" + File.separator + "hive" + File.separator + "src" + 
File.separator + "test" + 
+    new File("sql" + File.separator + "hive" + File.separator + "src" + 
File.separator + "test" +
       File.separator + "resources")
   }
 
@@ -130,6 +130,7 @@ class TestHiveContext(sc: SparkContext) extends 
LocalHiveContext(sc) {
     override lazy val analyzed = {
       val describedTables = logical match {
         case NativeCommand(describedTable(tbl)) => tbl :: Nil
+        case CacheCommand(tbl, _) => tbl :: Nil
         case _ => Nil
       }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0266a0c8/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index f9a162e..91ac03c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -56,4 +56,20 @@ class CachedTableSuite extends HiveComparisonTest {
       TestHive.uncacheTable("src")
     }
   }
+
+  test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
+    TestHive.hql("CACHE TABLE src")
+    TestHive.table("src").queryExecution.executedPlan match {
+      case _: InMemoryColumnarTableScan => // Found evidence of caching
+      case _ => fail(s"Table 'src' should be cached")
+    }
+    assert(TestHive.isCached("src"), "Table 'src' should be cached")
+
+    TestHive.hql("UNCACHE TABLE src")
+    TestHive.table("src").queryExecution.executedPlan match {
+      case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be 
cached")
+      case _ => // Found evidence of uncaching
+    }
+    assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
+  }
 }

Reply via email to