Repository: spark Updated Branches: refs/heads/master 6f54dee66 -> c51ab37fa
[SPARK-5833] [SQL] Adds REFRESH TABLE command Lifts `HiveMetastoreCatalog.refreshTable` to `Catalog`. Adds `RefreshTable` command to refresh (possibly cached) metadata in external data sources tables. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4624) <!-- Reviewable:end --> Author: Cheng Lian <l...@databricks.com> Closes #4624 from liancheng/refresh-table and squashes the following commits: 8d1aa4c [Cheng Lian] Adds REFRESH TABLE command Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c51ab37f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c51ab37f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c51ab37f Branch: refs/heads/master Commit: c51ab37faddf4ede23243058dfb388e74a192552 Parents: 6f54dee Author: Cheng Lian <l...@databricks.com> Authored: Mon Feb 16 12:52:05 2015 -0800 Committer: Michael Armbrust <mich...@databricks.com> Committed: Mon Feb 16 12:52:05 2015 -0800 ---------------------------------------------------------------------- .../spark/sql/catalyst/analysis/Catalog.scala | 10 ++++ .../org/apache/spark/sql/sources/ddl.scala | 52 +++++++++++--------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 2 +- 4 files changed, 42 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index bf97215..9e6e291 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -46,6 +46,8 @@ trait Catalog { */ def getTables(databaseName: Option[String]): Seq[(String, Boolean)] + def refreshTable(databaseName: String, tableName: String): Unit + def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit def unregisterTable(tableIdentifier: Seq[String]): Unit @@ -119,6 +121,10 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { case (name, _) => (name, true) }.toSeq } + + override def refreshTable(databaseName: String, tableName: String): Unit = { + throw new UnsupportedOperationException + } } /** @@ -224,4 +230,8 @@ object EmptyCatalog extends Catalog { } override def unregisterAllTables(): Unit = {} + + override def refreshTable(databaseName: String, tableName: String): Unit = { + throw new UnsupportedOperationException + } } http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 8cac9c0..1b5e8c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -66,6 +66,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val EXTENDED = Keyword("EXTENDED") protected val AS = Keyword("AS") protected val COMMENT = Keyword("COMMENT") + protected val REFRESH = Keyword("REFRESH") // Data types. protected val STRING = Keyword("STRING") @@ -85,7 +86,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val MAP = Keyword("MAP") protected val STRUCT = Keyword("STRUCT") - protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable + protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable protected def start: Parser[LogicalPlan] = ddl @@ -104,9 +105,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { * AS SELECT ... */ protected lazy val createTable: Parser[LogicalPlan] = - ( - (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident - ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { + (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ + tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => if (temp.isDefined && allowExisting.isDefined) { throw new DDLException( @@ -145,8 +145,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { allowExisting.isDefined, managedIfNoPath = false) } - } - ) + } protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" @@ -166,6 +165,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined) } + protected lazy val refreshTable: Parser[LogicalPlan] = + REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ { + case maybeDatabaseName ~ tableName => + RefreshTable(maybeDatabaseName.getOrElse("default"), tableName) + } + protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } @@ -177,10 +182,10 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm => val meta = cm match { case Some(comment) => - new MetadataBuilder().putString(COMMENT.str.toLowerCase(), comment).build() + new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build() case None => Metadata.empty } - StructField(columnName, typ, true, meta) + StructField(columnName, typ, nullable = true, meta) } protected lazy val primitiveType: Parser[DataType] = @@ -318,24 +323,18 @@ private[sql] case class DescribeCommand( isExtended: Boolean) extends Command { override val output = Seq( // Column names are based on Hive. - AttributeReference("col_name", StringType, nullable = false, + AttributeReference("col_name", StringType, nullable = false, new MetadataBuilder().putString("comment", "name of the column").build())(), - AttributeReference("data_type", StringType, nullable = false, + AttributeReference("data_type", StringType, nullable = false, new MetadataBuilder().putString("comment", "data type of the column").build())(), - AttributeReference("comment", StringType, nullable = false, + AttributeReference("comment", StringType, nullable = false, new MetadataBuilder().putString("comment", "comment of the column").build())()) } /** * Used to represent the operation of create table using a data source. - * @param tableName - * @param userSpecifiedSchema - * @param provider - * @param temporary - * @param options * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - * @param managedIfNoPath + * If it is false, an exception will be thrown */ private[sql] case class CreateTableUsing( tableName: String, @@ -362,7 +361,7 @@ private[sql] case class CreateTableUsingAsLogicalPlan( options: Map[String, String], query: LogicalPlan) extends Command -private [sql] case class CreateTempTableUsing( +private[sql] case class CreateTempTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, @@ -376,7 +375,7 @@ private [sql] case class CreateTempTableUsing( } } -private [sql] case class CreateTempTableUsingAsSelect( +private[sql] case class CreateTempTableUsingAsSelect( tableName: String, provider: String, mode: SaveMode, @@ -393,6 +392,15 @@ private [sql] case class CreateTempTableUsingAsSelect( } } +private[sql] case class RefreshTable(databaseName: String, tableName: String) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.catalog.refreshTable(databaseName, tableName) + Seq.empty[Row] + } +} + /** * Builds a map in which keys are case insensitive */ @@ -408,7 +416,7 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[St override def iterator: Iterator[(String, String)] = baseMap.iterator - override def -(key: String): Map[String, String] = baseMap - key.toLowerCase() + override def -(key: String): Map[String, String] = baseMap - key.toLowerCase } /** http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 12f86a0..580c570 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -91,7 +91,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader) } - def refreshTable(databaseName: String, tableName: String): Unit = { + override def refreshTable(databaseName: String, tableName: String): Unit = { cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase) } http://git-wip-us.apache.org/repos/asf/spark/blob/c51ab37f/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index addf887..375aae5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -177,7 +177,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT * FROM jsonTable"), Row("a1", "b1")) - refreshTable("jsonTable") + sql("REFRESH TABLE jsonTable") // Check that the refresh worked checkAnswer( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org