Repository: spark
Updated Branches:
  refs/heads/master ffcb6e055 -> 840853ed0


[SPARK-16458][SQL] SessionCatalog should support `listColumns` for temporary 
tables

## What changes were proposed in this pull request?

Temporary tables are used frequently, but `spark.catalog.listColumns` does not 
support those tables. This PR make `SessionCatalog` supports temporary table 
column listing.

**Before**
```scala
scala> spark.range(10).createOrReplaceTempView("t1")

scala> spark.catalog.listTables().collect()
res1: Array[org.apache.spark.sql.catalog.Table] = Array(Table[name=`t1`, 
tableType=`TEMPORARY`, isTemporary=`true`])

scala> spark.catalog.listColumns("t1").collect()
org.apache.spark.sql.AnalysisException: Table `t1` does not exist in database 
`default`.;
```

**After**
```
scala> spark.catalog.listColumns("t1").collect()
res2: Array[org.apache.spark.sql.catalog.Column] = Array(Column[name='id', 
description='id', dataType='bigint', nullable='false', isPartition='false', 
isBucket='false'])
```
## How was this patch tested?

Pass the Jenkins tests including a new testcase.

Author: Dongjoon Hyun <dongj...@apache.org>

Closes #14114 from dongjoon-hyun/SPARK-16458.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/840853ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/840853ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/840853ed

Branch: refs/heads/master
Commit: 840853ed06d63694bf98b21a889a960aac6ac0ac
Parents: ffcb6e0
Author: Dongjoon Hyun <dongj...@apache.org>
Authored: Mon Jul 11 22:45:22 2016 +0200
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Mon Jul 11 22:45:22 2016 +0200

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 32 ++++++++++++++-----
 .../catalyst/catalog/SessionCatalogSuite.scala  | 33 ++++++++++++++++++++
 .../org/apache/spark/sql/catalog/Catalog.scala  |  3 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |  8 +++--
 .../spark/sql/internal/CatalogSuite.scala       |  5 +++
 5 files changed, 71 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/840853ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index d88b5ff..c0ebb2b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
@@ -253,9 +253,27 @@ class SessionCatalog(
   def getTableMetadata(name: TableIdentifier): CatalogTable = {
     val db = formatDatabaseName(name.database.getOrElse(getCurrentDatabase))
     val table = formatTableName(name.table)
-    requireDbExists(db)
-    requireTableExists(TableIdentifier(table, Some(db)))
-    externalCatalog.getTable(db, table)
+    val tid = TableIdentifier(table)
+    if (isTemporaryTable(name)) {
+      CatalogTable(
+        identifier = tid,
+        tableType = CatalogTableType.VIEW,
+        storage = CatalogStorageFormat.empty,
+        schema = tempTables(table).output.map { c =>
+          CatalogColumn(
+            name = c.name,
+            dataType = c.dataType.catalogString,
+            nullable = c.nullable,
+            comment = Option(c.name)
+          )
+        },
+        properties = Map(),
+        viewText = None)
+    } else {
+      requireDbExists(db)
+      requireTableExists(TableIdentifier(table, Some(db)))
+      externalCatalog.getTable(db, table)
+    }
   }
 
   /**
@@ -432,10 +450,10 @@ class SessionCatalog(
   def tableExists(name: TableIdentifier): Boolean = synchronized {
     val db = formatDatabaseName(name.database.getOrElse(currentDb))
     val table = formatTableName(name.table)
-    if (name.database.isDefined || !tempTables.contains(table)) {
-      externalCatalog.tableExists(db, table)
+    if (isTemporaryTable(name)) {
+      true
     } else {
-      true // it's a temporary table
+      externalCatalog.tableExists(db, table)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/840853ed/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 05eb302..adce5df 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -432,6 +432,39 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(catalog.tableExists(TableIdentifier("tbl3")))
   }
 
+  test("tableExists on temporary views") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempTable = Range(1, 10, 2, 10)
+    assert(!catalog.tableExists(TableIdentifier("view1")))
+    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
+    catalog.createTempView("view1", tempTable, overrideIfExists = false)
+    assert(catalog.tableExists(TableIdentifier("view1")))
+    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
+  }
+
+  test("getTableMetadata on temporary views") {
+    val catalog = new SessionCatalog(newBasicCatalog())
+    val tempTable = Range(1, 10, 2, 10)
+    val m = intercept[AnalysisException] {
+      catalog.getTableMetadata(TableIdentifier("view1"))
+    }.getMessage
+    assert(m.contains("Table or view 'view1' not found in database 'default'"))
+
+    val m2 = intercept[AnalysisException] {
+      catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
+    }.getMessage
+    assert(m2.contains("Table or view 'view1' not found in database 
'default'"))
+
+    catalog.createTempView("view1", tempTable, overrideIfExists = false)
+    assert(catalog.getTableMetadata(TableIdentifier("view1")).identifier.table 
== "view1")
+    assert(catalog.getTableMetadata(TableIdentifier("view1")).schema(0).name 
== "id")
+
+    val m3 = intercept[AnalysisException] {
+      catalog.getTableMetadata(TableIdentifier("view1", Some("default")))
+    }.getMessage
+    assert(m3.contains("Table or view 'view1' not found in database 
'default'"))
+  }
+
   test("list tables without pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val tempTable = Range(1, 10, 2, 10)

http://git-wip-us.apache.org/repos/asf/spark/blob/840853ed/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 91ed9b3..1aed245 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -85,7 +85,8 @@ abstract class Catalog {
   def listFunctions(dbName: String): Dataset[Function]
 
   /**
-   * Returns a list of columns for the given table in the current database.
+   * Returns a list of columns for the given table in the current database or
+   * the given temporary table.
    *
    * @since 2.0.0
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/840853ed/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 44babcc..a6ae6fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -138,7 +138,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    */
   @throws[AnalysisException]("table does not exist")
   override def listColumns(tableName: String): Dataset[Column] = {
-    listColumns(currentDatabase, tableName)
+    listColumns(TableIdentifier(tableName, None))
   }
 
   /**
@@ -147,7 +147,11 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   @throws[AnalysisException]("database or table does not exist")
   override def listColumns(dbName: String, tableName: String): Dataset[Column] 
= {
     requireTableExists(dbName, tableName)
-    val tableMetadata = 
sessionCatalog.getTableMetadata(TableIdentifier(tableName, Some(dbName)))
+    listColumns(TableIdentifier(tableName, Some(dbName)))
+  }
+
+  private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = 
{
+    val tableMetadata = sessionCatalog.getTableMetadata(tableIdentifier)
     val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
     val bucketColumnNames = tableMetadata.bucketColumnNames.toSet
     val columns = tableMetadata.schema.map { c =>

http://git-wip-us.apache.org/repos/asf/spark/blob/840853ed/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index d862e4c..d75df56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -234,6 +234,11 @@ class CatalogSuite
     testListColumns("tab1", dbName = None)
   }
 
+  test("list columns in temporary table") {
+    createTempTable("temp1")
+    spark.catalog.listColumns("temp1")
+  }
+
   test("list columns in database") {
     createDatabase("db1")
     createTable("tab1", Some("db1"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to