Repository: spark
Updated Branches:
  refs/heads/master 6f54dee66 -> c51ab37fa


[SPARK-5833] [SQL] Adds REFRESH TABLE command

Lifts `HiveMetastoreCatalog.refreshTable` to `Catalog`. Adds `RefreshTable` 
command to refresh (possibly cached) metadata in external data sources tables.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/4624)
<!-- Reviewable:end -->

Author: Cheng Lian <l...@databricks.com>

Closes #4624 from liancheng/refresh-table and squashes the following commits:

8d1aa4c [Cheng Lian] Adds REFRESH TABLE command


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c51ab37f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c51ab37f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c51ab37f

Branch: refs/heads/master
Commit: c51ab37faddf4ede23243058dfb388e74a192552
Parents: 6f54dee
Author: Cheng Lian <l...@databricks.com>
Authored: Mon Feb 16 12:52:05 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Feb 16 12:52:05 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Catalog.scala   | 10 ++++
 .../org/apache/spark/sql/sources/ddl.scala      | 52 +++++++++++---------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  2 +-
 4 files changed, 42 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index bf97215..9e6e291 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -46,6 +46,8 @@ trait Catalog {
    */
   def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
 
+  def refreshTable(databaseName: String, tableName: String): Unit
+
   def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
 
   def unregisterTable(tableIdentifier: Seq[String]): Unit
@@ -119,6 +121,10 @@ class SimpleCatalog(val caseSensitive: Boolean) extends 
Catalog {
       case (name, _) => (name, true)
     }.toSeq
   }
+
+  override def refreshTable(databaseName: String, tableName: String): Unit = {
+    throw new UnsupportedOperationException
+  }
 }
 
 /**
@@ -224,4 +230,8 @@ object EmptyCatalog extends Catalog {
   }
 
   override def unregisterAllTables(): Unit = {}
+
+  override def refreshTable(databaseName: String, tableName: String): Unit = {
+    throw new UnsupportedOperationException
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 8cac9c0..1b5e8c2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference}
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -66,6 +66,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser 
with Logging {
   protected val EXTENDED = Keyword("EXTENDED")
   protected val AS = Keyword("AS")
   protected val COMMENT = Keyword("COMMENT")
+  protected val REFRESH = Keyword("REFRESH")
 
   // Data types.
   protected val STRING = Keyword("STRING")
@@ -85,7 +86,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser 
with Logging {
   protected val MAP = Keyword("MAP")
   protected val STRUCT = Keyword("STRUCT")
 
-  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable
+  protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | 
refreshTable
 
   protected def start: Parser[LogicalPlan] = ddl
 
@@ -104,9 +105,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser 
with Logging {
    * AS SELECT ...
    */
   protected lazy val createTable: Parser[LogicalPlan] =
-  (
-    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident
-      ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> 
restInput).? ^^ {
+    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
+      tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> 
restInput).? ^^ {
       case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ 
query =>
         if (temp.isDefined && allowExisting.isDefined) {
           throw new DDLException(
@@ -145,8 +145,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser 
with Logging {
             allowExisting.isDefined,
             managedIfNoPath = false)
         }
-      }
-  )
+    }
 
   protected lazy val tableCols: Parser[Seq[StructField]] =  "(" ~> 
repsep(column, ",") <~ ")"
 
@@ -166,6 +165,12 @@ private[sql] class DDLParser extends 
AbstractSparkSQLParser with Logging {
         DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
    }
 
+  protected lazy val refreshTable: Parser[LogicalPlan] =
+    REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
+      case maybeDatabaseName ~ tableName =>
+        RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
+    }
+
   protected lazy val options: Parser[Map[String, String]] =
     "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => 
s.toMap }
 
@@ -177,10 +182,10 @@ private[sql] class DDLParser extends 
AbstractSparkSQLParser with Logging {
     ident ~ dataType ~ (COMMENT ~> stringLit).?  ^^ { case columnName ~ typ ~ 
cm =>
       val meta = cm match {
         case Some(comment) =>
-          new MetadataBuilder().putString(COMMENT.str.toLowerCase(), 
comment).build()
+          new MetadataBuilder().putString(COMMENT.str.toLowerCase, 
comment).build()
         case None => Metadata.empty
       }
-      StructField(columnName, typ, true, meta)
+      StructField(columnName, typ, nullable = true, meta)
     }
 
   protected lazy val primitiveType: Parser[DataType] =
@@ -318,24 +323,18 @@ private[sql] case class DescribeCommand(
     isExtended: Boolean) extends Command {
   override val output = Seq(
     // Column names are based on Hive.
-    AttributeReference("col_name", StringType, nullable = false, 
+    AttributeReference("col_name", StringType, nullable = false,
       new MetadataBuilder().putString("comment", "name of the 
column").build())(),
-    AttributeReference("data_type", StringType, nullable = false, 
+    AttributeReference("data_type", StringType, nullable = false,
       new MetadataBuilder().putString("comment", "data type of the 
column").build())(),
-    AttributeReference("comment", StringType, nullable = false, 
+    AttributeReference("comment", StringType, nullable = false,
       new MetadataBuilder().putString("comment", "comment of the 
column").build())())
 }
 
 /**
   * Used to represent the operation of create table using a data source.
-  * @param tableName
-  * @param userSpecifiedSchema
-  * @param provider
-  * @param temporary
-  * @param options
   * @param allowExisting If it is true, we will do nothing when the table 
already exists.
- *                      If it is false, an exception will be thrown
-  * @param managedIfNoPath
+  *                      If it is false, an exception will be thrown
   */
 private[sql] case class CreateTableUsing(
     tableName: String,
@@ -362,7 +361,7 @@ private[sql] case class CreateTableUsingAsLogicalPlan(
     options: Map[String, String],
     query: LogicalPlan) extends Command
 
-private [sql] case class CreateTempTableUsing(
+private[sql] case class CreateTempTableUsing(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
     provider: String,
@@ -376,7 +375,7 @@ private [sql] case class CreateTempTableUsing(
   }
 }
 
-private [sql] case class CreateTempTableUsingAsSelect(
+private[sql] case class CreateTempTableUsingAsSelect(
     tableName: String,
     provider: String,
     mode: SaveMode,
@@ -393,6 +392,15 @@ private [sql] case class CreateTempTableUsingAsSelect(
   }
 }
 
+private[sql] case class RefreshTable(databaseName: String, tableName: String)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    sqlContext.catalog.refreshTable(databaseName, tableName)
+    Seq.empty[Row]
+  }
+}
+
 /**
  * Builds a map in which keys are case insensitive
  */
@@ -408,7 +416,7 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, 
String]) extends Map[St
 
   override def iterator: Iterator[(String, String)] = baseMap.iterator
 
-  override def -(key: String): Map[String, String] = baseMap - 
key.toLowerCase()
+  override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 12f86a0..580c570 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -91,7 +91,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
     CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
   }
 
-  def refreshTable(databaseName: String, tableName: String): Unit = {
+  override def refreshTable(databaseName: String, tableName: String): Unit = {
     cachedDataSourceTables.refresh(QualifiedTableName(databaseName, 
tableName).toLowerCase)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index addf887..375aae5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -177,7 +177,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
       sql("SELECT * FROM jsonTable"),
       Row("a1", "b1"))
 
-    refreshTable("jsonTable")
+    sql("REFRESH TABLE jsonTable")
 
     // Check that the refresh worked
     checkAnswer(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to