[SPARK-17338][SQL] add global temp view

## What changes were proposed in this pull request?

Global temporary view is a cross-session temporary view, which means it's 
shared among all sessions. Its lifetime is the lifetime of the Spark 
application, i.e. it will be automatically dropped when the application 
terminates. It's tied to a system preserved database `global_temp`(configurable 
via SparkConf), and we must use the qualified name to refer a global temp view, 
e.g. SELECT * FROM global_temp.view1.

changes for `SessionCatalog`:

1. add a new field `gloabalTempViews: GlobalTempViewManager`, to access the 
shared global temp views, and the global temp db name.
2. `createDatabase` will fail if users wanna create `global_temp`, which is 
system preserved.
3. `setCurrentDatabase` will fail if users wanna set `global_temp`, which is 
system preserved.
4. add `createGlobalTempView`, which is used in `CreateViewCommand` to create 
global temp views.
5. add `dropGlobalTempView`, which is used in `CatalogImpl` to drop global temp 
view.
6. add `alterTempViewDefinition`, which is used in `AlterViewAsCommand` to 
update the view definition for local/global temp views.
7. 
`renameTable`/`dropTable`/`isTemporaryTable`/`lookupRelation`/`getTempViewOrPermanentTableMetadata`/`refreshTable`
 will handle global temp views.

changes for SQL commands:

1. `CreateViewCommand`/`AlterViewAsCommand` is updated to support global temp 
views
2. `ShowTablesCommand` outputs a new column `database`, which is used to 
distinguish global and local temp views.
3. other commands can also handle global temp views if they call 
`SessionCatalog` APIs which accepts global temp views, e.g. `DropTableCommand`, 
`AlterTableRenameCommand`, `ShowColumnsCommand`, etc.

changes for other public API

1. add a new method `dropGlobalTempView` in `Catalog`
2. `Catalog.findTable` can find global temp view
3. add a new method `createGlobalTempView` in `Dataset`

## How was this patch tested?

new tests in `SQLViewSuite`

Author: Wenchen Fan <[email protected]>

Closes #14897 from cloud-fan/global-temp-view.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ddff4b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ddff4b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ddff4b

Branch: refs/heads/master
Commit: 23ddff4b2b2744c3dc84d928e144c541ad5df376
Parents: 1659003
Author: Wenchen Fan <[email protected]>
Authored: Mon Oct 10 15:48:57 2016 +0800
Committer: Wenchen Fan <[email protected]>
Committed: Mon Oct 10 15:48:57 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |   7 +
 docs/sql-programming-guide.md                   |  45 ++++-
 .../spark/examples/sql/JavaSparkSQLExample.java |  30 ++-
 examples/src/main/python/sql/basic.py           |  25 +++
 .../spark/examples/sql/SparkSQLExample.scala    |  25 +++
 project/MimaExcludes.scala                      |   4 +-
 python/pyspark/sql/catalog.py                   |  18 +-
 python/pyspark/sql/context.py                   |   2 +-
 python/pyspark/sql/dataframe.py                 |  25 ++-
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   8 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  10 +-
 .../catalog/GlobalTempViewManager.scala         | 121 ++++++++++++
 .../sql/catalyst/catalog/SessionCatalog.scala   | 189 +++++++++++++++----
 .../scala/org/apache/spark/sql/Dataset.scala    |  48 ++++-
 .../org/apache/spark/sql/catalog/Catalog.scala  |  20 +-
 .../spark/sql/execution/QueryExecution.scala    |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  19 +-
 .../spark/sql/execution/command/ddl.scala       |  25 +--
 .../spark/sql/execution/command/tables.scala    |  11 +-
 .../spark/sql/execution/command/views.scala     | 150 ++++++++-------
 .../spark/sql/execution/datasources/ddl.scala   |  20 +-
 .../apache/spark/sql/internal/CatalogImpl.scala |  26 ++-
 .../spark/sql/internal/SessionState.scala       |   1 +
 .../apache/spark/sql/internal/SharedState.scala |  75 +++++---
 .../org/apache/spark/sql/SQLContextSuite.scala  |  11 +-
 .../sql/execution/GlobalTempViewSuite.scala     | 168 +++++++++++++++++
 .../spark/sql/execution/command/DDLSuite.scala  |  10 +-
 .../spark/sql/hive/HiveSessionCatalog.scala     |   4 +-
 .../spark/sql/hive/HiveSessionState.scala       |   1 +
 .../hive/HiveContextCompatibilitySuite.scala    |   4 +-
 .../apache/spark/sql/hive/ListTablesSuite.scala |   8 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   2 +-
 .../sql/hive/execution/HiveCommandSuite.scala   |  10 +-
 .../spark/sql/hive/execution/SQLViewSuite.scala |   6 +-
 34 files changed, 906 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index d536cc5..0896e68 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -98,6 +98,13 @@ package object config {
     .checkValues(Set("hive", "in-memory"))
     .createWithDefault("in-memory")
 
+  // Note: This is a SQL config but needs to be in core because it's 
cross-session and can not put
+  // in SQLConf.
+  private[spark] val GLOBAL_TEMP_DATABASE = 
ConfigBuilder("spark.sql.globalTempDatabase")
+    .internal()
+    .stringConf
+    .createWithDefault("global_temp")
+
   private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
     ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
       .intConf

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 71bdd19..835cb69 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -220,6 +220,41 @@ The `sql` function enables applications to run SQL queries 
programmatically and
 </div>
 
 
+## Global Temporary View
+
+Temporay views in Spark SQL are session-scoped and will disappear if the 
session that creates it
+terminates. If you want to have a temporary view that is shared among all 
sessions and keep alive
+until the Spark application terminiates, you can create a global temporary 
view. Global temporary
+view is tied to a system preserved database `global_temp`, and we must use the 
qualified name to
+refer it, e.g. `SELECT * FROM global_temp.view1`.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+{% include_example global_temp_view 
scala/org/apache/spark/examples/sql/SparkSQLExample.scala %}
+</div>
+
+<div data-lang="java" markdown="1">
+{% include_example global_temp_view 
java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
+</div>
+
+<div data-lang="python"  markdown="1">
+{% include_example global_temp_view python/sql/basic.py %}
+</div>
+
+<div data-lang="sql"  markdown="1">
+
+{% highlight sql %}
+
+CREATE GLOBAL TEMPORARY VIEW temp_view AS SELECT a + 1, b * 2 FROM tbl
+
+SELECT * FROM global_temp.temp_view
+
+{% endhighlight %}
+
+</div>
+</div>
+
+
 ## Creating Datasets
 
 Datasets are similar to RDDs, however, instead of using Java serialization or 
Kryo they use
@@ -1058,14 +1093,14 @@ the Data Sources API. The following options are 
supported:
       The JDBC fetch size, which determines how many rows to fetch per round 
trip. This can help performance on JDBC drivers which default to low fetch size 
(eg. Oracle with 10 rows).
     </td>
   </tr>
-  
+
   <tr>
     <td><code>truncate</code></td>
     <td>
-     This is a JDBC writer related option. When 
<code>SaveMode.Overwrite</code> is enabled, this option causes Spark to 
truncate an existing table instead of dropping and recreating it. This can be 
more efficient, and prevents the table metadata (e.g. indices) from being 
removed. However, it will not work in some cases, such as when the new data has 
a different schema. It defaults to <code>false</code>. 
+     This is a JDBC writer related option. When 
<code>SaveMode.Overwrite</code> is enabled, this option causes Spark to 
truncate an existing table instead of dropping and recreating it. This can be 
more efficient, and prevents the table metadata (e.g. indices) from being 
removed. However, it will not work in some cases, such as when the new data has 
a different schema. It defaults to <code>false</code>.
    </td>
   </tr>
-  
+
   <tr>
     <td><code>createTableOptions</code></td>
     <td>
@@ -1101,11 +1136,11 @@ USING org.apache.spark.sql.jdbc
 OPTIONS (
   url "jdbc:postgresql:dbserver",
   dbtable "schema.tablename",
-  user 'username', 
+  user 'username',
   password 'password'
 )
 
-INSERT INTO TABLE jdbcTable 
+INSERT INTO TABLE jdbcTable
 SELECT * FROM resultTable
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
----------------------------------------------------------------------
diff --git 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
index cff9032..c5770d1 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java
@@ -54,6 +54,7 @@ import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 // $example off:programmatic_schema$
+import org.apache.spark.sql.AnalysisException;
 
 // $example on:untyped_ops$
 // col("...") is preferable to df.col("...")
@@ -84,7 +85,7 @@ public class JavaSparkSQLExample {
   }
   // $example off:create_ds$
 
-  public static void main(String[] args) {
+  public static void main(String[] args) throws AnalysisException {
     // $example on:init_session$
     SparkSession spark = SparkSession
       .builder()
@@ -101,7 +102,7 @@ public class JavaSparkSQLExample {
     spark.stop();
   }
 
-  private static void runBasicDataFrameExample(SparkSession spark) {
+  private static void runBasicDataFrameExample(SparkSession spark) throws 
AnalysisException {
     // $example on:create_df$
     Dataset<Row> df = 
spark.read().json("examples/src/main/resources/people.json");
 
@@ -176,6 +177,31 @@ public class JavaSparkSQLExample {
     // |  19| Justin|
     // +----+-------+
     // $example off:run_sql$
+
+    // $example on:global_temp_view$
+    // Register the DataFrame as a global temporary view
+    df.createGlobalTempView("people");
+
+    // Global temporary view is tied to a system preserved database 
`global_temp`
+    spark.sql("SELECT * FROM global_temp.people").show();
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+
+    // Global temporary view is cross-session
+    spark.newSession().sql("SELECT * FROM global_temp.people").show();
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+    // $example off:global_temp_view$
   }
 
   private static void runDatasetCreationExample(SparkSession spark) {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/examples/src/main/python/sql/basic.py
----------------------------------------------------------------------
diff --git a/examples/src/main/python/sql/basic.py 
b/examples/src/main/python/sql/basic.py
index fdc017a..ebcf669 100644
--- a/examples/src/main/python/sql/basic.py
+++ b/examples/src/main/python/sql/basic.py
@@ -114,6 +114,31 @@ def basic_df_example(spark):
     # +----+-------+
     # $example off:run_sql$
 
+    # $example on:global_temp_view$
+    # Register the DataFrame as a global temporary view
+    df.createGlobalTempView("people")
+
+    # Global temporary view is tied to a system preserved database 
`global_temp`
+    spark.sql("SELECT * FROM global_temp.people").show()
+    # +----+-------+
+    # | age|   name|
+    # +----+-------+
+    # |null|Michael|
+    # |  30|   Andy|
+    # |  19| Justin|
+    # +----+-------+
+
+    # Global temporary view is cross-session
+    spark.newSession().sql("SELECT * FROM global_temp.people").show()
+    # +----+-------+
+    # | age|   name|
+    # +----+-------+
+    # |null|Michael|
+    # |  30|   Andy|
+    # |  19| Justin|
+    # +----+-------+
+    # $example off:global_temp_view$
+
 
 def schema_inference_example(spark):
     # $example on:schema_inferring$

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala 
b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
index 129b81d..f27c403 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala
@@ -135,6 +135,31 @@ object SparkSQLExample {
     // |  19| Justin|
     // +----+-------+
     // $example off:run_sql$
+
+    // $example on:global_temp_view$
+    // Register the DataFrame as a global temporary view
+    df.createGlobalTempView("people")
+
+    // Global temporary view is tied to a system preserved database 
`global_temp`
+    spark.sql("SELECT * FROM global_temp.people").show()
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+
+    // Global temporary view is cross-session
+    spark.newSession().sql("SELECT * FROM global_temp.people").show()
+    // +----+-------+
+    // | age|   name|
+    // +----+-------+
+    // |null|Michael|
+    // |  30|   Andy|
+    // |  19| Justin|
+    // +----+-------+
+    // $example off:global_temp_view$
   }
 
   private def runDatasetCreationExample(spark: SparkSession): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 163e3f2..e3d9a17 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -55,7 +55,9 @@ object MimaExcludes {
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.getFunction"),
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.databaseExists"),
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.tableExists"),
-      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists")
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.functionExists"),
+      // [SPARK-17338][SQL] add global temp view
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView")
     )
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/python/pyspark/sql/catalog.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 3c50307..df3bf42 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -167,7 +167,7 @@ class Catalog(object):
 
     @since(2.0)
     def dropTempView(self, viewName):
-        """Drops the temporary view with the given view name in the catalog.
+        """Drops the local temporary view with the given view name in the 
catalog.
         If the view has been cached before, then it will also be uncached.
 
         >>> spark.createDataFrame([(1, 1)]).createTempView("my_table")
@@ -181,6 +181,22 @@ class Catalog(object):
         """
         self._jcatalog.dropTempView(viewName)
 
+    @since(2.1)
+    def dropGlobalTempView(self, viewName):
+        """Drops the global temporary view with the given view name in the 
catalog.
+        If the view has been cached before, then it will also be uncached.
+
+        >>> spark.createDataFrame([(1, 1)]).createGlobalTempView("my_table")
+        >>> spark.table("global_temp.my_table").collect()
+        [Row(_1=1, _2=1)]
+        >>> spark.catalog.dropGlobalTempView("my_table")
+        >>> spark.table("global_temp.my_table") # doctest: 
+IGNORE_EXCEPTION_DETAIL
+        Traceback (most recent call last):
+            ...
+        AnalysisException: ...
+        """
+        self._jcatalog.dropGlobalTempView(viewName)
+
     @ignore_unicode_prefix
     @since(2.0)
     def registerFunction(self, name, f, returnType=StringType()):

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 7482be8..8264dcf 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -386,7 +386,7 @@ class SQLContext(object):
         >>> sqlContext.registerDataFrameAsTable(df, "table1")
         >>> df2 = sqlContext.tables()
         >>> df2.filter("tableName = 'table1'").first()
-        Row(tableName=u'table1', isTemporary=True)
+        Row(database=u'', tableName=u'table1', isTemporary=True)
         """
         if dbName is None:
             return DataFrame(self._ssql_ctx.tables(), self)

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0ac481a..14e80ea 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -131,7 +131,7 @@ class DataFrame(object):
 
     @since(2.0)
     def createTempView(self, name):
-        """Creates a temporary view with this DataFrame.
+        """Creates a local temporary view with this DataFrame.
 
         The lifetime of this temporary table is tied to the 
:class:`SparkSession`
         that was used to create this :class:`DataFrame`.
@@ -153,7 +153,7 @@ class DataFrame(object):
 
     @since(2.0)
     def createOrReplaceTempView(self, name):
-        """Creates or replaces a temporary view with this DataFrame.
+        """Creates or replaces a local temporary view with this DataFrame.
 
         The lifetime of this temporary table is tied to the 
:class:`SparkSession`
         that was used to create this :class:`DataFrame`.
@@ -169,6 +169,27 @@ class DataFrame(object):
         """
         self._jdf.createOrReplaceTempView(name)
 
+    @since(2.1)
+    def createGlobalTempView(self, name):
+        """Creates a global temporary view with this DataFrame.
+
+        The lifetime of this temporary view is tied to this Spark application.
+        throws :class:`TempTableAlreadyExistsException`, if the view name 
already exists in the
+        catalog.
+
+        >>> df.createGlobalTempView("people")
+        >>> df2 = spark.sql("select * from global_temp.people")
+        >>> sorted(df.collect()) == sorted(df2.collect())
+        True
+        >>> df.createGlobalTempView("people")  # doctest: 
+IGNORE_EXCEPTION_DETAIL
+        Traceback (most recent call last):
+        ...
+        AnalysisException: u"Temporary table 'people' already exists;"
+        >>> spark.catalog.dropGlobalTempView("people")
+
+        """
+        self._jdf.createGlobalTempView(name)
+
     @property
     @since(1.4)
     def write(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index a3bbace..b599a88 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -111,11 +111,12 @@ statement
     | ALTER TABLE tableIdentifier RECOVER PARTITIONS                   
#recoverPartitions
     | DROP TABLE (IF EXISTS)? tableIdentifier PURGE?                   
#dropTable
     | DROP VIEW (IF EXISTS)? tableIdentifier                           
#dropTable
-    | CREATE (OR REPLACE)? TEMPORARY? VIEW (IF NOT EXISTS)? tableIdentifier
+    | CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
+        VIEW (IF NOT EXISTS)? tableIdentifier
         identifierCommentList? (COMMENT STRING)?
         (PARTITIONED ON identifierList)?
         (TBLPROPERTIES tablePropertyList)? AS query                    
#createView
-    | CREATE (OR REPLACE)? TEMPORARY VIEW
+    | CREATE (OR REPLACE)? GLOBAL? TEMPORARY VIEW
         tableIdentifier ('(' colTypeList ')')? tableProvider
         (OPTIONS tablePropertyList)?                                   
#createTempViewUsing
     | ALTER VIEW tableIdentifier AS? query                             
#alterViewQuery
@@ -676,7 +677,7 @@ nonReserved
     | MAP | ARRAY | STRUCT
     | LATERAL | WINDOW | REDUCE | TRANSFORM | USING | SERDE | SERDEPROPERTIES 
| RECORDREADER
     | DELIMITED | FIELDS | TERMINATED | COLLECTION | ITEMS | KEYS | ESCAPED | 
LINES | SEPARATED
-    | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | TEMPORARY | OPTIONS
+    | EXTENDED | REFRESH | CLEAR | CACHE | UNCACHE | LAZY | GLOBAL | TEMPORARY 
| OPTIONS
     | GROUPING | CUBE | ROLLUP
     | EXPLAIN | FORMAT | LOGICAL | FORMATTED | CODEGEN
     | TABLESAMPLE | USE | TO | BUCKET | PERCENTLIT | OUT | OF
@@ -864,6 +865,7 @@ CACHE: 'CACHE';
 UNCACHE: 'UNCACHE';
 LAZY: 'LAZY';
 FORMATTED: 'FORMATTED';
+GLOBAL: 'GLOBAL';
 TEMPORARY: 'TEMPORARY' | 'TEMP';
 OPTIONS: 'OPTIONS';
 UNSET: 'UNSET';

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ae8869f..536d387 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -458,12 +458,12 @@ class Analyzer(
         i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
       case u: UnresolvedRelation =>
         val table = u.tableIdentifier
-        if (table.database.isDefined && conf.runSQLonFile &&
+        if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
             (!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
-          // If the table does not exist, and the database part is specified, 
and we support
-          // running SQL directly on files, then let's just return the 
original UnresolvedRelation.
-          // It is possible we are matching a query like "select * from 
parquet.`/path/to/query`".
-          // The plan will get resolved later.
+          // If the database part is specified, and we support running SQL 
directly on files, and
+          // it's not a temporary view, and the table does not exist, then 
let's just return the
+          // original UnresolvedRelation. It is possible we are matching a 
query like "select *
+          // from parquet.`/path/to/query`". The plan will get resolved later.
           // Note that we are testing (!db_exists || !table_exists) because 
the catalog throws
           // an exception from tableExists if the database does not exist.
           u

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
new file mode 100644
index 0000000..6095ac0
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/GlobalTempViewManager.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.analysis.TempTableAlreadyExistsException
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.StringUtils
+
+
+/**
+ * A thread-safe manager for global temporary views, providing atomic 
operations to manage them,
+ * e.g. create, update, remove, etc.
+ *
+ * Note that, the view name is always case-sensitive here, callers are 
responsible to format the
+ * view name w.r.t. case-sensitive config.
+ *
+ * @param database The system preserved virtual database that keeps all the 
global temporary views.
+ */
+class GlobalTempViewManager(val database: String) {
+
+  /** List of view definitions, mapping from view name to logical plan. */
+  @GuardedBy("this")
+  private val viewDefinitions = new mutable.HashMap[String, LogicalPlan]
+
+  /**
+   * Returns the global view definition which matches the given name, or None 
if not found.
+   */
+  def get(name: String): Option[LogicalPlan] = synchronized {
+    viewDefinitions.get(name)
+  }
+
+  /**
+   * Creates a global temp view, or issue an exception if the view already 
exists and
+   * `overrideIfExists` is false.
+   */
+  def create(
+      name: String,
+      viewDefinition: LogicalPlan,
+      overrideIfExists: Boolean): Unit = synchronized {
+    if (!overrideIfExists && viewDefinitions.contains(name)) {
+      throw new TempTableAlreadyExistsException(name)
+    }
+    viewDefinitions.put(name, viewDefinition)
+  }
+
+  /**
+   * Updates the global temp view if it exists, returns true if updated, false 
otherwise.
+   */
+  def update(
+      name: String,
+      viewDefinition: LogicalPlan): Boolean = synchronized {
+    if (viewDefinitions.contains(name)) {
+      viewDefinitions.put(name, viewDefinition)
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Removes the global temp view if it exists, returns true if removed, false 
otherwise.
+   */
+  def remove(name: String): Boolean = synchronized {
+    viewDefinitions.remove(name).isDefined
+  }
+
+  /**
+   * Renames the global temp view if the source view exists and the 
destination view not exists, or
+   * issue an exception if the source view exists but the destination view 
already exists. Returns
+   * true if renamed, false otherwise.
+   */
+  def rename(oldName: String, newName: String): Boolean = synchronized {
+    if (viewDefinitions.contains(oldName)) {
+      if (viewDefinitions.contains(newName)) {
+        throw new AnalysisException(
+          s"rename temporary view from '$oldName' to '$newName': destination 
view already exists")
+      }
+
+      val viewDefinition = viewDefinitions(oldName)
+      viewDefinitions.remove(oldName)
+      viewDefinitions.put(newName, viewDefinition)
+      true
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Lists the names of all global temporary views.
+   */
+  def listViewNames(pattern: String): Seq[String] = synchronized {
+    StringUtils.filterPattern(viewDefinitions.keys.toSeq, pattern)
+  }
+
+  /**
+   * Clears all the global temporary views.
+   */
+  def clear(): Unit = synchronized {
+    viewDefinitions.clear()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8c01c7a..e44e30e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config.GLOBAL_TEMP_DATABASE
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
@@ -47,6 +48,7 @@ object SessionCatalog {
  */
 class SessionCatalog(
     externalCatalog: ExternalCatalog,
+    globalTempViewManager: GlobalTempViewManager,
     functionResourceLoader: FunctionResourceLoader,
     functionRegistry: FunctionRegistry,
     conf: CatalystConf,
@@ -61,6 +63,7 @@ class SessionCatalog(
       conf: CatalystConf) {
     this(
       externalCatalog,
+      new GlobalTempViewManager(GLOBAL_TEMP_DATABASE.defaultValueString),
       DummyFunctionResourceLoader,
       functionRegistry,
       conf,
@@ -142,8 +145,13 @@ class SessionCatalog(
   // 
----------------------------------------------------------------------------
 
   def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): 
Unit = {
-    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
     val dbName = formatDatabaseName(dbDefinition.name)
+    if (dbName == globalTempViewManager.database) {
+      throw new AnalysisException(
+        s"${globalTempViewManager.database} is a system preserved database, " +
+          "you cannot create a database with this name.")
+    }
+    val qualifiedPath = makeQualifiedPath(dbDefinition.locationUri).toString
     externalCatalog.createDatabase(
       dbDefinition.copy(name = dbName, locationUri = qualifiedPath),
       ignoreIfExists)
@@ -154,7 +162,7 @@ class SessionCatalog(
     if (dbName == DEFAULT_DATABASE) {
       throw new AnalysisException(s"Can not drop default database")
     } else if (dbName == getCurrentDatabase) {
-      throw new AnalysisException(s"Can not drop current database `${dbName}`")
+      throw new AnalysisException(s"Can not drop current database `$dbName`")
     }
     externalCatalog.dropDatabase(dbName, ignoreIfNotExists, cascade)
   }
@@ -188,6 +196,13 @@ class SessionCatalog(
 
   def setCurrentDatabase(db: String): Unit = {
     val dbName = formatDatabaseName(db)
+    if (dbName == globalTempViewManager.database) {
+      throw new AnalysisException(
+        s"${globalTempViewManager.database} is a system preserved database, " +
+          "you cannot use it as current database. To access global temporary 
views, you should " +
+          "use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * 
FROM " +
+          s"${globalTempViewManager.database}.viewName.")
+    }
     requireDbExists(dbName)
     synchronized { currentDb = dbName }
   }
@@ -329,7 +344,7 @@ class SessionCatalog(
   // ----------------------------------------------
 
   /**
-   * Create a temporary table.
+   * Create a local temporary view.
    */
   def createTempView(
       name: String,
@@ -343,19 +358,65 @@ class SessionCatalog(
   }
 
   /**
-   * Return a temporary view exactly as it was stored.
+   * Create a global temporary view.
+   */
+  def createGlobalTempView(
+      name: String,
+      viewDefinition: LogicalPlan,
+      overrideIfExists: Boolean): Unit = {
+    globalTempViewManager.create(formatTableName(name), viewDefinition, 
overrideIfExists)
+  }
+
+  /**
+   * Alter the definition of a local/global temp view matching the given name, 
returns true if a
+   * temp view is matched and altered, false otherwise.
+   */
+  def alterTempViewDefinition(
+      name: TableIdentifier,
+      viewDefinition: LogicalPlan): Boolean = synchronized {
+    val viewName = formatTableName(name.table)
+    if (name.database.isEmpty) {
+      if (tempTables.contains(viewName)) {
+        createTempView(viewName, viewDefinition, overrideIfExists = true)
+        true
+      } else {
+        false
+      }
+    } else if (formatDatabaseName(name.database.get) == 
globalTempViewManager.database) {
+      globalTempViewManager.update(viewName, viewDefinition)
+    } else {
+      false
+    }
+  }
+
+  /**
+   * Return a local temporary view exactly as it was stored.
    */
   def getTempView(name: String): Option[LogicalPlan] = synchronized {
     tempTables.get(formatTableName(name))
   }
 
   /**
-   * Drop a temporary view.
+   * Return a global temporary view exactly as it was stored.
+   */
+  def getGlobalTempView(name: String): Option[LogicalPlan] = {
+    globalTempViewManager.get(formatTableName(name))
+  }
+
+  /**
+   * Drop a local temporary view.
    */
   def dropTempView(name: String): Unit = synchronized {
     tempTables.remove(formatTableName(name))
   }
 
+  /**
+   * Drop a global temporary view.
+   */
+  def dropGlobalTempView(name: String): Boolean = {
+    globalTempViewManager.remove(formatTableName(name))
+  }
+
   // -------------------------------------------------------------
   // | Methods that interact with temporary and metastore tables |
   // -------------------------------------------------------------
@@ -371,9 +432,7 @@ class SessionCatalog(
    */
   def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable 
= synchronized {
     val table = formatTableName(name.table)
-    if (name.database.isDefined) {
-      getTableMetadata(name)
-    } else {
+    if (name.database.isEmpty) {
       getTempView(table).map { plan =>
         CatalogTable(
           identifier = TableIdentifier(table),
@@ -381,6 +440,16 @@ class SessionCatalog(
           storage = CatalogStorageFormat.empty,
           schema = plan.output.toStructType)
       }.getOrElse(getTableMetadata(name))
+    } else if (formatDatabaseName(name.database.get) == 
globalTempViewManager.database) {
+      globalTempViewManager.get(table).map { plan =>
+        CatalogTable(
+          identifier = TableIdentifier(table, 
Some(globalTempViewManager.database)),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = plan.output.toStructType)
+      }.getOrElse(throw new 
NoSuchTableException(globalTempViewManager.database, table))
+    } else {
+      getTableMetadata(name)
     }
   }
 
@@ -393,21 +462,25 @@ class SessionCatalog(
    */
   def renameTable(oldName: TableIdentifier, newName: String): Unit = 
synchronized {
     val db = formatDatabaseName(oldName.database.getOrElse(currentDb))
-    requireDbExists(db)
     val oldTableName = formatTableName(oldName.table)
     val newTableName = formatTableName(newName)
-    if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
-      requireTableExists(TableIdentifier(oldTableName, Some(db)))
-      requireTableNotExists(TableIdentifier(newTableName, Some(db)))
-      externalCatalog.renameTable(db, oldTableName, newTableName)
+    if (db == globalTempViewManager.database) {
+      globalTempViewManager.rename(oldTableName, newTableName)
     } else {
-      if (tempTables.contains(newTableName)) {
-        throw new AnalysisException(
-          s"RENAME TEMPORARY TABLE from '$oldName' to '$newName': destination 
table already exists")
+      requireDbExists(db)
+      if (oldName.database.isDefined || !tempTables.contains(oldTableName)) {
+        requireTableExists(TableIdentifier(oldTableName, Some(db)))
+        requireTableNotExists(TableIdentifier(newTableName, Some(db)))
+        externalCatalog.renameTable(db, oldTableName, newTableName)
+      } else {
+        if (tempTables.contains(newTableName)) {
+          throw new AnalysisException(s"RENAME TEMPORARY TABLE from '$oldName' 
to '$newName': " +
+            "destination table already exists")
+        }
+        val table = tempTables(oldTableName)
+        tempTables.remove(oldTableName)
+        tempTables.put(newTableName, table)
       }
-      val table = tempTables(oldTableName)
-      tempTables.remove(oldTableName)
-      tempTables.put(newTableName, table)
     }
   }
 
@@ -424,17 +497,24 @@ class SessionCatalog(
       purge: Boolean): Unit = synchronized {
     val db = formatDatabaseName(name.database.getOrElse(currentDb))
     val table = formatTableName(name.table)
-    if (name.database.isDefined || !tempTables.contains(table)) {
-      requireDbExists(db)
-      // When ignoreIfNotExists is false, no exception is issued when the 
table does not exist.
-      // Instead, log it as an error message.
-      if (tableExists(TableIdentifier(table, Option(db)))) {
-        externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge = 
purge)
-      } else if (!ignoreIfNotExists) {
-        throw new NoSuchTableException(db = db, table = table)
+    if (db == globalTempViewManager.database) {
+      val viewExists = globalTempViewManager.remove(table)
+      if (!viewExists && !ignoreIfNotExists) {
+        throw new NoSuchTableException(globalTempViewManager.database, table)
       }
     } else {
-      tempTables.remove(table)
+      if (name.database.isDefined || !tempTables.contains(table)) {
+        requireDbExists(db)
+        // When ignoreIfNotExists is false, no exception is issued when the 
table does not exist.
+        // Instead, log it as an error message.
+        if (tableExists(TableIdentifier(table, Option(db)))) {
+          externalCatalog.dropTable(db, table, ignoreIfNotExists = true, purge 
= purge)
+        } else if (!ignoreIfNotExists) {
+          throw new NoSuchTableException(db = db, table = table)
+        }
+      } else {
+        tempTables.remove(table)
+      }
     }
   }
 
@@ -445,6 +525,9 @@ class SessionCatalog(
    * If no database is specified, this will first attempt to return a 
temporary table/view with
    * the same name, then, if that does not exist, return the table/view from 
the current database.
    *
+   * Note that, the global temp view database is also valid here, this will 
return the global temp
+   * view matching the given name.
+   *
    * If the relation is a view, the relation will be wrapped in a 
[[SubqueryAlias]] which will
    * track the name of the view.
    */
@@ -453,7 +536,11 @@ class SessionCatalog(
       val db = formatDatabaseName(name.database.getOrElse(currentDb))
       val table = formatTableName(name.table)
       val relationAlias = alias.getOrElse(table)
-      if (name.database.isDefined || !tempTables.contains(table)) {
+      if (db == globalTempViewManager.database) {
+        globalTempViewManager.get(table).map { viewDef =>
+          SubqueryAlias(relationAlias, viewDef, Some(name))
+        }.getOrElse(throw new NoSuchTableException(db, table))
+      } else if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
         val view = Option(metadata.tableType).collect {
           case CatalogTableType.VIEW => name
@@ -472,27 +559,48 @@ class SessionCatalog(
    * explicitly specified.
    */
   def isTemporaryTable(name: TableIdentifier): Boolean = synchronized {
-    name.database.isEmpty && tempTables.contains(formatTableName(name.table))
+    val table = formatTableName(name.table)
+    if (name.database.isEmpty) {
+      tempTables.contains(table)
+    } else if (formatDatabaseName(name.database.get) == 
globalTempViewManager.database) {
+      globalTempViewManager.get(table).isDefined
+    } else {
+      false
+    }
   }
 
   /**
-   * List all tables in the specified database, including temporary tables.
+   * List all tables in the specified database, including local temporary 
tables.
+   *
+   * Note that, if the specified database is global temporary view database, 
we will list global
+   * temporary views.
    */
   def listTables(db: String): Seq[TableIdentifier] = listTables(db, "*")
 
   /**
-   * List all matching tables in the specified database, including temporary 
tables.
+   * List all matching tables in the specified database, including local 
temporary tables.
+   *
+   * Note that, if the specified database is global temporary view database, 
we will list global
+   * temporary views.
    */
   def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
     val dbName = formatDatabaseName(db)
-    requireDbExists(dbName)
-    val dbTables =
-      externalCatalog.listTables(dbName, pattern).map { t => 
TableIdentifier(t, Some(dbName)) }
-    synchronized {
-      val _tempTables = StringUtils.filterPattern(tempTables.keys.toSeq, 
pattern)
-        .map { t => TableIdentifier(t) }
-      dbTables ++ _tempTables
+    val dbTables = if (dbName == globalTempViewManager.database) {
+      globalTempViewManager.listViewNames(pattern).map { name =>
+        TableIdentifier(name, Some(globalTempViewManager.database))
+      }
+    } else {
+      requireDbExists(dbName)
+      externalCatalog.listTables(dbName, pattern).map { name =>
+        TableIdentifier(name, Some(dbName))
+      }
+    }
+    val localTempViews = synchronized {
+      StringUtils.filterPattern(tempTables.keys.toSeq, pattern).map { name =>
+        TableIdentifier(name)
+      }
     }
+    dbTables ++ localTempViews
   }
 
   /**
@@ -504,6 +612,8 @@ class SessionCatalog(
     // If the database is not defined, there is a good chance this is a temp 
table.
     if (name.database.isEmpty) {
       tempTables.get(formatTableName(name.table)).foreach(_.refresh())
+    } else if (formatDatabaseName(name.database.get) == 
globalTempViewManager.database) {
+      
globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
     }
   }
 
@@ -919,6 +1029,7 @@ class SessionCatalog(
       }
     }
     tempTables.clear()
+    globalTempViewManager.clear()
     functionRegistry.clear()
     // restore built-in functions
     FunctionRegistry.builtin.listFunction().foreach { f =>

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9cfbdff..4b52508 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, 
QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.{CreateViewCommand, 
ExplainCommand}
+import org.apache.spark.sql.execution.command.{CreateViewCommand, 
ExplainCommand, GlobalTempView, LocalTempView}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2433,9 +2433,13 @@ class Dataset[T] private[sql](
   }
 
   /**
-   * Creates a temporary view using the given name. The lifetime of this
+   * Creates a local temporary view using the given name. The lifetime of this
    * temporary view is tied to the [[SparkSession]] that was used to create 
this Dataset.
    *
+   * Local temporary view is session-scoped. Its lifetime is the lifetime of 
the session that
+   * created it, i.e. it will be automatically dropped when the session 
terminates. It's not
+   * tied to any databases, i.e. we can't use `db1.view1` to reference a local 
temporary view.
+   *
    * @throws AnalysisException if the view name already exists
    *
    * @group basic
@@ -2443,21 +2447,51 @@ class Dataset[T] private[sql](
    */
   @throws[AnalysisException]
   def createTempView(viewName: String): Unit = withPlan {
-    createViewCommand(viewName, replace = false)
+    createTempViewCommand(viewName, replace = false, global = false)
   }
 
+
+
   /**
-   * Creates a temporary view using the given name. The lifetime of this
+   * Creates a local temporary view using the given name. The lifetime of this
    * temporary view is tied to the [[SparkSession]] that was used to create 
this Dataset.
    *
    * @group basic
    * @since 2.0.0
    */
   def createOrReplaceTempView(viewName: String): Unit = withPlan {
-    createViewCommand(viewName, replace = true)
+    createTempViewCommand(viewName, replace = true, global = false)
   }
 
-  private def createViewCommand(viewName: String, replace: Boolean): 
CreateViewCommand = {
+  /**
+   * Creates a global temporary view using the given name. The lifetime of this
+   * temporary view is tied to this Spark application.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark application,
+   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
+   * preserved database `_global_temp`, and we must use the qualified name to 
refer a global temp
+   * view, e.g. `SELECT * FROM _global_temp.view1`.
+   *
+   * @throws TempTableAlreadyExistsException if the view name already exists
+   *
+   * @group basic
+   * @since 2.1.0
+   */
+  @throws[AnalysisException]
+  def createGlobalTempView(viewName: String): Unit = withPlan {
+    createTempViewCommand(viewName, replace = false, global = true)
+  }
+
+  private def createTempViewCommand(
+      viewName: String,
+      replace: Boolean,
+      global: Boolean): CreateViewCommand = {
+    val viewType = if (global) {
+      GlobalTempView
+    } else {
+      LocalTempView
+    }
+
     CreateViewCommand(
       name = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
       userSpecifiedColumns = Nil,
@@ -2467,7 +2501,7 @@ class Dataset[T] private[sql](
       child = logicalPlan,
       allowExisting = false,
       replace = replace,
-      isTemporary = true)
+      viewType = viewType)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 7f2762c..717fb29 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -262,15 +262,33 @@ abstract class Catalog {
       options: Map[String, String]): DataFrame
 
   /**
-   * Drops the temporary view with the given view name in the catalog.
+   * Drops the local temporary view with the given view name in the catalog.
    * If the view has been cached before, then it will also be uncached.
    *
+   * Local temporary view is session-scoped. Its lifetime is the lifetime of 
the session that
+   * created it, i.e. it will be automatically dropped when the session 
terminates. It's not
+   * tied to any databases, i.e. we can't use `db1.view1` to reference a local 
temporary view.
+   *
    * @param viewName the name of the view to be dropped.
    * @since 2.0.0
    */
   def dropTempView(viewName: String): Unit
 
   /**
+   * Drops the global temporary view with the given view name in the catalog.
+   * If the view has been cached before, then it will also be uncached.
+   *
+   * Global temporary view is cross-session. Its lifetime is the lifetime of 
the Spark application,
+   * i.e. it will be automatically dropped when the application terminates. 
It's tied to a system
+   * preserved database `_global_temp`, and we must use the qualified name to 
refer a global temp
+   * view, e.g. `SELECT * FROM _global_temp.view1`.
+   *
+   * @param viewName the name of the view to be dropped.
+   * @since 2.1.0
+   */
+  def dropGlobalTempView(viewName: String): Boolean
+
+  /**
    * Returns true if the table is currently cached in-memory.
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 383b3a2..cb45a6d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -21,15 +21,14 @@ import java.nio.charset.StandardCharsets
 import java.sql.Timestamp
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.{DescribeTableCommand, 
ExecutedCommandExec, ShowTablesCommand}
 import org.apache.spark.sql.execution.exchange.{EnsureRequirements, 
ReuseExchange}
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, 
TimestampType, _}
 import org.apache.spark.util.Utils
 
@@ -125,6 +124,9 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
               .mkString("\t")
         }
       }
+    // SHOW TABLES in Hive only output table names, while ours outputs 
database, table name, isTemp.
+    case command: ExecutedCommandExec if 
command.cmd.isInstanceOf[ShowTablesCommand] =>
+      command.executeCollect().map(_.getString(1))
     case command: ExecutedCommandExec =>
       command.executeCollect().map(_.getString(0))
     case other =>

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 5f87b71..be2eddb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -29,9 +29,9 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
OneRowRelation, ScriptInputOutputSchema}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTable, 
CreateTempViewUsing, _}
+import org.apache.spark.sql.execution.datasources.{CreateTable, _}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.StructType
 
 /**
  * Concrete parser for Spark SQL statements.
@@ -385,7 +385,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
         logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, 
please use " +
           "CREATE TEMPORARY VIEW ... USING ... instead")
-        CreateTempViewUsing(table, schema, replace = true, provider, options)
+        CreateTempViewUsing(table, schema, replace = true, global = false, 
provider, options)
       } else {
         CreateTable(tableDesc, mode, None)
       }
@@ -401,6 +401,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
       userSpecifiedSchema = Option(ctx.colTypeList()).map(createSchema),
       replace = ctx.REPLACE != null,
+      global = ctx.GLOBAL != null,
       provider = ctx.tableProvider.qualifiedName.getText,
       options = 
Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
   }
@@ -1269,7 +1270,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
    *
    * For example:
    * {{{
-   *   CREATE [OR REPLACE] [TEMPORARY] VIEW [IF NOT EXISTS] [db_name.]view_name
+   *   CREATE [OR REPLACE] [[GLOBAL] TEMPORARY] VIEW [IF NOT EXISTS] 
[db_name.]view_name
    *   [(column_name [COMMENT column_comment], ...) ]
    *   [COMMENT view_comment]
    *   [TBLPROPERTIES (property_name = property_value, ...)]
@@ -1286,6 +1287,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
         }
       }
 
+      val viewType = if (ctx.TEMPORARY == null) {
+        PersistedView
+      } else if (ctx.GLOBAL != null) {
+        GlobalTempView
+      } else {
+        LocalTempView
+      }
+
       CreateViewCommand(
         name = visitTableIdentifier(ctx.tableIdentifier),
         userSpecifiedColumns = userSpecifiedColumns,
@@ -1295,7 +1304,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
         child = plan(ctx.query),
         allowExisting = ctx.EXISTS != null,
         replace = ctx.REPLACE != null,
-        isTemporary = ctx.TEMPORARY != null)
+        viewType = viewType)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 01ac898..45fa293 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,17 +183,20 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a 
view
-    // issue an exception.
-    catalog.getTableMetadataOption(tableName).map(_.tableType match {
-      case CatalogTableType.VIEW if !isView =>
-        throw new AnalysisException(
-          "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
-      case o if o != CatalogTableType.VIEW && isView =>
-        throw new AnalysisException(
-          s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
-      case _ =>
-    })
+
+    if (!catalog.isTemporaryTable(tableName) && 
catalog.tableExists(tableName)) {
+      // If the command DROP VIEW is to drop a table or DROP TABLE is to drop 
a view
+      // issue an exception.
+      catalog.getTableMetadata(tableName).tableType match {
+        case CatalogTableType.VIEW if !isView =>
+          throw new AnalysisException(
+            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+        case o if o != CatalogTableType.VIEW && isView =>
+          throw new AnalysisException(
+            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE 
instead")
+        case _ =>
+      }
+    }
     try {
       sparkSession.sharedState.cacheManager.uncacheQuery(
         sparkSession.table(tableName.quotedString))

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 08de6cd..424ef58 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -579,9 +579,10 @@ case class ShowTablesCommand(
     databaseName: Option[String],
     tableIdentifierPattern: Option[String]) extends RunnableCommand {
 
-  // The result of SHOW TABLES has two columns, tableName and isTemporary.
+  // The result of SHOW TABLES has three columns: database, tableName and 
isTemporary.
   override val output: Seq[Attribute] = {
-    AttributeReference("tableName", StringType, nullable = false)() ::
+    AttributeReference("database", StringType, nullable = false)() ::
+      AttributeReference("tableName", StringType, nullable = false)() ::
       AttributeReference("isTemporary", BooleanType, nullable = false)() :: Nil
   }
 
@@ -592,9 +593,9 @@ case class ShowTablesCommand(
     val db = databaseName.getOrElse(catalog.getCurrentDatabase)
     val tables =
       tableIdentifierPattern.map(catalog.listTables(db, 
_)).getOrElse(catalog.listTables(db))
-    tables.map { t =>
-      val isTemp = t.database.isEmpty
-      Row(t.table, isTemp)
+    tables.map { tableIdent =>
+      val isTemp = catalog.isTemporaryTable(tableIdent)
+      Row(tableIdent.database.getOrElse(""), tableIdent.table, isTemp)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 15340ee..bbcd9c4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -19,13 +19,46 @@ package org.apache.spark.sql.execution.command
 
 import scala.util.control.NonFatal
 
-import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{SQLBuilder, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Alias
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.execution.datasources.{DataSource, LogicalRelation}
+import org.apache.spark.sql.types.{MetadataBuilder, StructType}
+
+
+/**
+ * ViewType is used to specify the expected view type when we want to create 
or replace a view in
+ * [[CreateViewCommand]].
+ */
+sealed trait ViewType
+
+/**
+ * LocalTempView means session-scoped local temporary views. Its lifetime is 
the lifetime of the
+ * session that created it, i.e. it will be automatically dropped when the 
session terminates. It's
+ * not tied to any databases, i.e. we can't use `db1.view1` to reference a 
local temporary view.
+ */
+object LocalTempView extends ViewType
+
+/**
+ * GlobalTempView means cross-session global temporary views. Its lifetime is 
the lifetime of the
+ * Spark application, i.e. it will be automatically dropped when the 
application terminates. It's
+ * tied to a system preserved database `_global_temp`, and we must use the 
qualified name to refer a
+ * global temp view, e.g. SELECT * FROM _global_temp.view1.
+ */
+object GlobalTempView extends ViewType
+
+/**
+ * PersistedView means cross-session persisted views. Persisted views stay 
until they are
+ * explicitly dropped by user command. It's always tied to a database, default 
to the current
+ * database if not specified.
+ *
+ * Note that, Existing persisted view with the same name are not visible to 
the current session
+ * while the local temporary view exists, unless the view name is qualified by 
database.
+ */
+object PersistedView extends ViewType
 
 
 /**
@@ -46,10 +79,7 @@ import org.apache.spark.sql.types.StructType
  *                already exists, throws analysis exception.
  * @param replace if true, and if the view already exists, updates it; if 
false, and if the view
  *                already exists, throws analysis exception.
- * @param isTemporary if true, the view is created as a temporary view. 
Temporary views are dropped
- *                 at the end of current Spark session. Existing permanent 
relations with the same
- *                 name are not visible to the current session while the 
temporary view exists,
- *                 unless they are specified with full qualified table name 
with database prefix.
+ * @param viewType the expected view type to be created with this command.
  */
 case class CreateViewCommand(
     name: TableIdentifier,
@@ -60,20 +90,21 @@ case class CreateViewCommand(
     child: LogicalPlan,
     allowExisting: Boolean,
     replace: Boolean,
-    isTemporary: Boolean)
+    viewType: ViewType)
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(child)
 
-  if (!isTemporary) {
-    require(originalText.isDefined,
-      "The table to created with CREATE VIEW must have 'originalText'.")
+  if (viewType == PersistedView) {
+    require(originalText.isDefined, "'originalText' must be provided to create 
permanent view")
   }
 
   if (allowExisting && replace) {
     throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and 
REPLACE is not allowed.")
   }
 
+  private def isTemporary = viewType == LocalTempView || viewType == 
GlobalTempView
+
   // Disallows 'CREATE TEMPORARY VIEW IF NOT EXISTS' to be consistent with 
'CREATE TEMPORARY TABLE'
   if (allowExisting && isTemporary) {
     throw new AnalysisException(
@@ -99,72 +130,53 @@ case class CreateViewCommand(
         s"(num: `${analyzedPlan.output.length}`) does not match the number of 
column names " +
         s"specified by CREATE VIEW (num: `${userSpecifiedColumns.length}`).")
     }
-    val sessionState = sparkSession.sessionState
-
-    if (isTemporary) {
-      createTemporaryView(sparkSession, analyzedPlan)
-    } else {
-      // Adds default database for permanent table if it doesn't exist, so 
that tableExists()
-      // only check permanent tables.
-      val database = 
name.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-      val qualifiedName = name.copy(database = Option(database))
-
-      if (sessionState.catalog.tableExists(qualifiedName)) {
-        val tableMetadata = 
sessionState.catalog.getTableMetadata(qualifiedName)
-        if (allowExisting) {
-          // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does 
nothing when the target view
-          // already exists.
-        } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
-          throw new AnalysisException(s"$qualifiedName is not a view")
-        } else if (replace) {
-          // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
-          sessionState.catalog.alterTable(prepareTable(sparkSession, 
analyzedPlan))
-        } else {
-          // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the 
target view already
-          // exists.
-          throw new AnalysisException(
-            s"View $qualifiedName already exists. If you want to update the 
view definition, " +
-              "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
-        }
-      } else {
-        // Create the view if it doesn't exist.
-        sessionState.catalog.createTable(
-          prepareTable(sparkSession, analyzedPlan), ignoreIfExists = false)
-      }
-    }
-    Seq.empty[Row]
-  }
-
-  private def createTemporaryView(sparkSession: SparkSession, analyzedPlan: 
LogicalPlan): Unit = {
-    val catalog = sparkSession.sessionState.catalog
 
-    // Projects column names to alias names
-    val logicalPlan = if (userSpecifiedColumns.isEmpty) {
+    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
       analyzedPlan
     } else {
       val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, _)) => Alias(attr, colName)()
+        case (attr, (colName, None)) => Alias(attr, colName)()
+        case (attr, (colName, Some(colComment))) =>
+          val meta = new MetadataBuilder().putString("comment", 
colComment).build()
+          Alias(attr, colName)(explicitMetadata = Some(meta))
       }
       sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
     }
 
-    catalog.createTempView(name.table, logicalPlan, replace)
+    val catalog = sparkSession.sessionState.catalog
+    if (viewType == LocalTempView) {
+      catalog.createTempView(name.table, aliasedPlan, overrideIfExists = 
replace)
+    } else if (viewType == GlobalTempView) {
+      catalog.createGlobalTempView(name.table, aliasedPlan, overrideIfExists = 
replace)
+    } else if (catalog.tableExists(name)) {
+      val tableMetadata = catalog.getTableMetadata(name)
+      if (allowExisting) {
+        // Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing 
when the target view
+        // already exists.
+      } else if (tableMetadata.tableType != CatalogTableType.VIEW) {
+        throw new AnalysisException(s"$name is not a view")
+      } else if (replace) {
+        // Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
+        catalog.alterTable(prepareTable(sparkSession, aliasedPlan))
+      } else {
+        // Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the 
target view already
+        // exists.
+        throw new AnalysisException(
+          s"View $name already exists. If you want to update the view 
definition, " +
+            "please use ALTER VIEW AS or CREATE OR REPLACE VIEW AS")
+      }
+    } else {
+      // Create the view if it doesn't exist.
+      catalog.createTable(prepareTable(sparkSession, aliasedPlan), 
ignoreIfExists = false)
+    }
+    Seq.empty[Row]
   }
 
   /**
    * Returns a [[CatalogTable]] that can be used to save in the catalog. This 
comment canonicalize
    * SQL based on the analyzed plan, and also creates the proper schema for 
the view.
    */
-  private def prepareTable(sparkSession: SparkSession, analyzedPlan: 
LogicalPlan): CatalogTable = {
-    val aliasedPlan = if (userSpecifiedColumns.isEmpty) {
-      analyzedPlan
-    } else {
-      val projectList = analyzedPlan.output.zip(userSpecifiedColumns).map {
-        case (attr, (colName, _)) => Alias(attr, colName)()
-      }
-      sparkSession.sessionState.executePlan(Project(projectList, 
analyzedPlan)).analyzed
-    }
-
+  private def prepareTable(sparkSession: SparkSession, aliasedPlan: 
LogicalPlan): CatalogTable = {
     val viewSQL: String = new SQLBuilder(aliasedPlan).toSQL
 
     // Validate the view SQL - make sure we can parse it and analyze it.
@@ -176,19 +188,11 @@ case class CreateViewCommand(
         throw new RuntimeException(s"Failed to analyze the canonicalized SQL: 
$viewSQL", e)
     }
 
-    val viewSchema = if (userSpecifiedColumns.isEmpty) {
-      aliasedPlan.schema
-    } else {
-      StructType(aliasedPlan.schema.zip(userSpecifiedColumns).map {
-        case (field, (_, comment)) => 
comment.map(field.withComment).getOrElse(field)
-      })
-    }
-
     CatalogTable(
       identifier = name,
       tableType = CatalogTableType.VIEW,
       storage = CatalogStorageFormat.empty,
-      schema = viewSchema,
+      schema = aliasedPlan.schema,
       properties = properties,
       viewOriginalText = originalText,
       viewText = Some(viewSQL),
@@ -222,8 +226,8 @@ case class AlterViewAsCommand(
     qe.assertAnalyzed()
     val analyzedPlan = qe.analyzed
 
-    if (session.sessionState.catalog.isTemporaryTable(name)) {
-      session.sessionState.catalog.createTempView(name.table, analyzedPlan, 
overrideIfExists = true)
+    if (session.sessionState.catalog.alterTempViewDefinition(name, 
analyzedPlan)) {
+      // a local/global temp view has been altered, we are done.
     } else {
       alterPermanentView(session, analyzedPlan)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index fa95af2..59fb48f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -40,16 +40,20 @@ case class CreateTable(
   override def innerChildren: Seq[QueryPlan[_]] = query.toSeq
 }
 
+/**
+ * Create or replace a local/global temporary view with given data source.
+ */
 case class CreateTempViewUsing(
     tableIdent: TableIdentifier,
     userSpecifiedSchema: Option[StructType],
     replace: Boolean,
+    global: Boolean,
     provider: String,
     options: Map[String, String]) extends RunnableCommand {
 
   if (tableIdent.database.isDefined) {
     throw new AnalysisException(
-      s"Temporary table '$tableIdent' should not have specified a database")
+      s"Temporary view '$tableIdent' should not have specified a database")
   }
 
   def run(sparkSession: SparkSession): Seq[Row] = {
@@ -58,10 +62,16 @@ case class CreateTempViewUsing(
       userSpecifiedSchema = userSpecifiedSchema,
       className = provider,
       options = options)
-    sparkSession.sessionState.catalog.createTempView(
-      tableIdent.table,
-      Dataset.ofRows(sparkSession, 
LogicalRelation(dataSource.resolveRelation())).logicalPlan,
-      replace)
+
+    val catalog = sparkSession.sessionState.catalog
+    val viewDefinition = Dataset.ofRows(
+      sparkSession, LogicalRelation(dataSource.resolveRelation())).logicalPlan
+
+    if (global) {
+      catalog.createGlobalTempView(tableIdent.table, viewDefinition, replace)
+    } else {
+      catalog.createTempView(tableIdent.table, viewDefinition, replace)
+    }
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index e412e1b..c05bda3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -94,20 +94,19 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    */
   @throws[AnalysisException]("database does not exist")
   override def listTables(dbName: String): Dataset[Table] = {
-    requireDatabaseExists(dbName)
     val tables = sessionCatalog.listTables(dbName).map(makeTable)
     CatalogImpl.makeDataset(tables, sparkSession)
   }
 
   private def makeTable(tableIdent: TableIdentifier): Table = {
     val metadata = 
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
-    val database = metadata.identifier.database
+    val isTemp = sessionCatalog.isTemporaryTable(tableIdent)
     new Table(
       name = tableIdent.table,
-      database = database.orNull,
+      database = metadata.identifier.database.orNull,
       description = metadata.comment.orNull,
-      tableType = if (database.isEmpty) "TEMPORARY" else 
metadata.tableType.name,
-      isTemporary = database.isEmpty)
+      tableType = if (isTemp) "TEMPORARY" else metadata.tableType.name,
+      isTemporary = isTemp)
   }
 
   /**
@@ -365,7 +364,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   }
 
   /**
-   * Drops the temporary view with the given view name in the catalog.
+   * Drops the local temporary view with the given view name in the catalog.
    * If the view has been cached/persisted before, it's also unpersisted.
    *
    * @param viewName the name of the view to be dropped.
@@ -380,6 +379,21 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   }
 
   /**
+   * Drops the global temporary view with the given view name in the catalog.
+   * If the view has been cached/persisted before, it's also unpersisted.
+   *
+   * @param viewName the name of the view to be dropped.
+   * @group ddl_ops
+   * @since 2.1.0
+   */
+  override def dropGlobalTempView(viewName: String): Boolean = {
+    sparkSession.sessionState.catalog.getGlobalTempView(viewName).exists { 
viewDef =>
+      
sparkSession.sharedState.cacheManager.uncacheQuery(Dataset.ofRows(sparkSession, 
viewDef))
+      sessionCatalog.dropGlobalTempView(viewName)
+    }
+  }
+
+  /**
    * Returns true if the table is currently cached in-memory.
    *
    * @group cachemgmt

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index 9f7d001..8759dfe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -95,6 +95,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
    */
   lazy val catalog = new SessionCatalog(
     sparkSession.sharedState.externalCatalog,
+    sparkSession.sharedState.globalTempViewManager,
     functionResourceLoader,
     functionRegistry,
     conf,

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 6387f01..c555a43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -22,11 +22,11 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
-import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, 
GlobalTempViewManager, InMemoryCatalog}
 import org.apache.spark.sql.execution.CacheManager
 import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
 import org.apache.spark.util.{MutableURLClassLoader, Utils}
@@ -37,39 +37,14 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
  */
 private[sql] class SharedState(val sparkContext: SparkContext) extends Logging 
{
 
-  /**
-   * Class for caching query results reused in future executions.
-   */
-  val cacheManager: CacheManager = new CacheManager
-
-  /**
-   * A listener for SQL-specific 
[[org.apache.spark.scheduler.SparkListenerEvent]]s.
-   */
-  val listener: SQLListener = createListenerAndUI(sparkContext)
-
+  // Load hive-site.xml into hadoopConf and determine the warehouse path we 
want to use, based on
+  // the config from both hive and Spark SQL. Finally set the warehouse config 
value to sparkConf.
   {
     val configFile = 
Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
     if (configFile != null) {
       sparkContext.hadoopConfiguration.addResource(configFile)
     }
-  }
-
-  /**
-   * A catalog that interacts with external systems.
-   */
-  lazy val externalCatalog: ExternalCatalog =
-    SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
-      SharedState.externalCatalogClassName(sparkContext.conf),
-      sparkContext.conf,
-      sparkContext.hadoopConfiguration)
-
-  /**
-   * A classloader used to load all user-added jar.
-   */
-  val jarClassLoader = new NonClosableMutableURLClassLoader(
-    org.apache.spark.util.Utils.getContextOrSparkClassLoader)
 
-  {
     // Set the Hive metastore warehouse path to the one we use
     val tempConf = new SQLConf
     sparkContext.conf.getAll.foreach { case (k, v) => 
tempConf.setConfString(k, v) }
@@ -94,6 +69,48 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) extends Logging {
   }
 
   /**
+   * Class for caching query results reused in future executions.
+   */
+  val cacheManager: CacheManager = new CacheManager
+
+  /**
+   * A listener for SQL-specific 
[[org.apache.spark.scheduler.SparkListenerEvent]]s.
+   */
+  val listener: SQLListener = createListenerAndUI(sparkContext)
+
+  /**
+   * A catalog that interacts with external systems.
+   */
+  val externalCatalog: ExternalCatalog =
+    SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
+      SharedState.externalCatalogClassName(sparkContext.conf),
+      sparkContext.conf,
+      sparkContext.hadoopConfiguration)
+
+  /**
+   * A manager for global temporary views.
+   */
+  val globalTempViewManager = {
+    // System preserved database should not exists in metastore. However it's 
hard to guarantee it
+    // for every session, because case-sensitivity differs. Here we always 
lowercase it to make our
+    // life easier.
+    val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase
+    if (externalCatalog.databaseExists(globalTempDB)) {
+      throw new SparkException(
+        s"$globalTempDB is a system preserved database, please rename your 
existing database " +
+          "to resolve the name conflict, or set a different value for " +
+          s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application 
again.")
+    }
+    new GlobalTempViewManager(globalTempDB)
+  }
+
+  /**
+   * A classloader used to load all user-added jar.
+   */
+  val jarClassLoader = new NonClosableMutableURLClassLoader(
+    org.apache.spark.util.Utils.getContextOrSparkClassLoader)
+
+  /**
    * Create a SQLListener then add it into SparkContext, and create a SQLTab 
if there is SparkUI.
    */
   private def createListenerAndUI(sc: SparkContext): SQLListener = {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
index 001c1a1..2b35db4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -88,11 +88,11 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
     df.createOrReplaceTempView("listtablessuitetable")
     assert(
       sqlContext.tables().filter("tableName = 
'listtablessuitetable'").collect().toSeq ==
-      Row("listtablessuitetable", true) :: Nil)
+      Row("", "listtablessuitetable", true) :: Nil)
 
     assert(
       sqlContext.sql("SHOW tables").filter("tableName = 
'listtablessuitetable'").collect().toSeq ==
-      Row("listtablessuitetable", true) :: Nil)
+      Row("", "listtablessuitetable", true) :: Nil)
 
     sqlContext.sessionState.catalog.dropTable(
       TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge 
= false)
@@ -105,11 +105,11 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
     df.createOrReplaceTempView("listtablessuitetable")
     assert(
       sqlContext.tables("default").filter("tableName = 
'listtablessuitetable'").collect().toSeq ==
-      Row("listtablessuitetable", true) :: Nil)
+      Row("", "listtablessuitetable", true) :: Nil)
 
     assert(
       sqlContext.sql("show TABLES in default").filter("tableName = 
'listtablessuitetable'")
-        .collect().toSeq == Row("listtablessuitetable", true) :: Nil)
+        .collect().toSeq == Row("", "listtablessuitetable", true) :: Nil)
 
     sqlContext.sessionState.catalog.dropTable(
       TableIdentifier("listtablessuitetable"), ignoreIfNotExists = true, purge 
= false)
@@ -122,7 +122,8 @@ class SQLContextSuite extends SparkFunSuite with 
SharedSparkContext {
     df.createOrReplaceTempView("listtablessuitetable")
 
     val expectedSchema = StructType(
-      StructField("tableName", StringType, false) ::
+      StructField("database", StringType, false) ::
+        StructField("tableName", StringType, false) ::
         StructField("isTemporary", BooleanType, false) :: Nil)
 
     Seq(sqlContext.tables(), sqlContext.sql("SHOW TABLes")).foreach {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ddff4b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
new file mode 100644
index 0000000..391bcb8
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/GlobalTempViewSuite.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.catalog.Table
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class GlobalTempViewSuite extends QueryTest with SharedSQLContext {
+  import testImplicits._
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    globalTempDB = spark.sharedState.globalTempViewManager.database
+  }
+
+  private var globalTempDB: String = _
+
+  test("basic semantic") {
+    sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 'a'")
+
+    // If there is no database in table name, we should try local temp view 
first, if not found,
+    // try table/view in current database, which is "default" in this case. So 
we expect
+    // NoSuchTableException here.
+    intercept[NoSuchTableException](spark.table("src"))
+
+    // Use qualified name to refer to the global temp view explicitly.
+    checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+    // Table name without database will never refer to a global temp view.
+    intercept[NoSuchTableException](sql("DROP VIEW src"))
+
+    sql(s"DROP VIEW $globalTempDB.src")
+    // The global temp view should be dropped successfully.
+    intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+
+    // We can also use Dataset API to create global temp view
+    Seq(1 -> "a").toDF("i", "j").createGlobalTempView("src")
+    checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+
+    // Use qualified name to rename a global temp view.
+    sql(s"ALTER VIEW $globalTempDB.src RENAME TO src2")
+    intercept[NoSuchTableException](spark.table(s"$globalTempDB.src"))
+    checkAnswer(spark.table(s"$globalTempDB.src2"), Row(1, "a"))
+
+    // Use qualified name to alter a global temp view.
+    sql(s"ALTER VIEW $globalTempDB.src2 AS SELECT 2, 'b'")
+    checkAnswer(spark.table(s"$globalTempDB.src2"), Row(2, "b"))
+
+    // We can also use Catalog API to drop global temp view
+    spark.catalog.dropGlobalTempView("src2")
+    intercept[NoSuchTableException](spark.table(s"$globalTempDB.src2"))
+  }
+
+  test("global temp view is shared among all sessions") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+      checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, 2))
+      val newSession = spark.newSession()
+      checkAnswer(newSession.table(s"$globalTempDB.src"), Row(1, 2))
+    } finally {
+      spark.catalog.dropGlobalTempView("src")
+    }
+  }
+
+  test("global temp view database should be preserved") {
+    val e = intercept[AnalysisException](sql(s"CREATE DATABASE $globalTempDB"))
+    assert(e.message.contains("system preserved database"))
+
+    val e2 = intercept[AnalysisException](sql(s"USE $globalTempDB"))
+    assert(e2.message.contains("system preserved database"))
+  }
+
+  test("CREATE GLOBAL TEMP VIEW USING") {
+    withTempPath { path =>
+      try {
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        sql(s"CREATE GLOBAL TEMP VIEW src USING parquet OPTIONS (PATH 
'${path.getAbsolutePath}')")
+        checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a"))
+        sql(s"INSERT INTO $globalTempDB.src SELECT 2, 'b'")
+        checkAnswer(spark.table(s"$globalTempDB.src"), Row(1, "a") :: Row(2, 
"b") :: Nil)
+      } finally {
+        spark.catalog.dropGlobalTempView("src")
+      }
+    }
+  }
+
+  test("CREATE TABLE LIKE should work for global temp view") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1 AS a, '2' AS b")
+      sql(s"CREATE TABLE cloned LIKE ${globalTempDB}.src")
+      val tableMeta = 
spark.sessionState.catalog.getTableMetadata(TableIdentifier("cloned"))
+      assert(tableMeta.schema == new StructType().add("a", "int", 
false).add("b", "string", false))
+    } finally {
+      spark.catalog.dropGlobalTempView("src")
+      sql("DROP TABLE default.cloned")
+    }
+  }
+
+  test("list global temp views") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW v1 AS SELECT 3, 4")
+      sql("CREATE TEMP VIEW v2 AS SELECT 1, 2")
+
+      checkAnswer(sql(s"SHOW TABLES IN $globalTempDB"),
+        Row(globalTempDB, "v1", true) ::
+        Row("", "v2", true) :: Nil)
+
+      
assert(spark.catalog.listTables(globalTempDB).collect().toSeq.map(_.name) == 
Seq("v1", "v2"))
+    } finally {
+      spark.catalog.dropTempView("v1")
+      spark.catalog.dropGlobalTempView("v2")
+    }
+  }
+
+  test("should lookup global temp view if and only if global temp db is 
specified") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW same_name AS SELECT 3, 4")
+      sql("CREATE TEMP VIEW same_name AS SELECT 1, 2")
+
+      checkAnswer(sql("SELECT * FROM same_name"), Row(1, 2))
+
+      // we never lookup global temp views if database is not specified in 
table name
+      spark.catalog.dropTempView("same_name")
+      intercept[AnalysisException](sql("SELECT * FROM same_name"))
+
+      // Use qualified name to lookup a global temp view.
+      checkAnswer(sql(s"SELECT * FROM $globalTempDB.same_name"), Row(3, 4))
+    } finally {
+      spark.catalog.dropTempView("same_name")
+      spark.catalog.dropGlobalTempView("same_name")
+    }
+  }
+
+  test("public Catalog should recognize global temp view") {
+    try {
+      sql("CREATE GLOBAL TEMP VIEW src AS SELECT 1, 2")
+
+      assert(spark.catalog.tableExists(globalTempDB, "src"))
+      assert(spark.catalog.getTable(globalTempDB, "src").toString == new Table(
+        name = "src",
+        database = globalTempDB,
+        description = null,
+        tableType = "TEMPORARY",
+        isTemporary = true).toString)
+    } finally {
+      spark.catalog.dropGlobalTempView("src")
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to