Repository: spark
Updated Branches:
  refs/heads/branch-1.2 f4160324c -> 31a6d4fed


[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables

This is the code refactor and follow ups for #2570

Author: Cheng Hao <[email protected]>

Closes #3336 from chenghao-intel/createtbl and squashes the following commits:

3563142 [Cheng Hao] remove the unused variable
e215187 [Cheng Hao] eliminate the compiling warning
4f97f14 [Cheng Hao] fix bug in unittest
5d58812 [Cheng Hao] revert the API changes
b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS

(cherry picked from commit 51b1fe1426ffecac6c4644523633ea1562ff9a4e)
Signed-off-by: Michael Armbrust <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/31a6d4fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/31a6d4fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/31a6d4fe

Branch: refs/heads/branch-1.2
Commit: 31a6d4fede28d46cd379f788678cc33b0b982d14
Parents: f416032
Author: Cheng Hao <[email protected]>
Authored: Mon Dec 8 17:39:12 2014 -0800
Committer: Michael Armbrust <[email protected]>
Committed: Mon Dec 8 17:39:56 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 26 ++++++++++++++++++--
 .../apache/spark/sql/hive/HiveStrategies.scala  | 14 ++++++++---
 .../hive/execution/CreateTableAsSelect.scala    | 16 ++++--------
 .../sql/hive/execution/SQLQuerySuite.scala      |  9 +++++++
 4 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/31a6d4fe/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 91a1577..6086563 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
    * For example, because of a CREATE TABLE X AS statement.
    */
   object CreateTables extends Rule[LogicalPlan] {
+    import org.apache.hadoop.hive.ql.Context
+    import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
+
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
+      case CreateTableAsSelect(db, tableName, child, allowExisting, 
Some(extra: ASTNode)) =>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
         val databaseName = 
dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
-        CreateTableAsSelect(Some(databaseName), tableName, child, 
allowExisting, extra)
+        // Get the CreateTableDesc from Hive SemanticAnalyzer
+        val desc: Option[CreateTableDesc] = if 
(tableExists(Some(databaseName), tblName)) {
+          None
+        } else {
+          val sa = new SemanticAnalyzer(hive.hiveconf) {
+            override def analyzeInternal(ast: ASTNode) {
+              // A hack to intercept the SemanticAnalyzer.analyzeInternal,
+              // to ignore the SELECT clause of the CTAS
+              val method = classOf[SemanticAnalyzer].getDeclaredMethod(
+                "analyzeCreateTable", classOf[ASTNode], classOf[QB])
+              method.setAccessible(true)
+              method.invoke(this, ast, this.getQB)
+            }
+          }
+
+          sa.analyze(extra, new Context(hive.hiveconf))
+          Some(sa.getQB().getTableDesc)
+        }
+
+        CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, 
desc)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/31a6d4fe/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index edf291f..5f02e95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.hive.ql.parse.ASTNode
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
         execution.InsertIntoHiveTable(
           table, partition, planLater(child), overwrite)(hiveContext) :: Nil
       case logical.CreateTableAsSelect(
-             Some(database), tableName, child, allowExisting, Some(extra: 
ASTNode)) =>
-        CreateTableAsSelect(
+             Some(database), tableName, child, allowExisting, Some(desc: 
CreateTableDesc)) =>
+        execution.CreateTableAsSelect(
           database,
           tableName,
           child,
           allowExisting,
-          extra) :: Nil
+          Some(desc)) :: Nil
+      case logical.CreateTableAsSelect(Some(database), tableName, child, 
allowExisting, None) =>
+        execution.CreateTableAsSelect(
+          database,
+          tableName,
+          child,
+          allowExisting,
+          None) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/31a6d4fe/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 3d24d87..b83689c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
@@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
  * @param query the query whose result will be insert into the new relation
  * @param allowExisting allow continue working if it's already exists, 
otherwise
  *                      raise exception
- * @param extra the extra information for this Operator, it should be the
- *              ASTNode object for extracting the CreateTableDesc.
+ * @param desc the CreateTableDesc, which may contains serde, storage handler 
etc.
 
  */
 @Experimental
@@ -45,7 +44,7 @@ case class CreateTableAsSelect(
     tableName: String,
     query: LogicalPlan,
     allowExisting: Boolean,
-    extra: ASTNode) extends LeafNode with Command {
+    desc: Option[CreateTableDesc]) extends LeafNode with Command {
 
   def output = Seq.empty
 
@@ -53,13 +52,8 @@ case class CreateTableAsSelect(
 
   // A lazy computing of the metastoreRelation
   private[this] lazy val metastoreRelation: MetastoreRelation = {
-    // Get the CreateTableDesc from Hive SemanticAnalyzer
-    val sa = new SemanticAnalyzer(sc.hiveconf)
-
-    sa.analyze(extra, new Context(sc.hiveconf))
-    val desc = sa.getQB().getTableDesc
     // Create Hive Table
-    sc.catalog.createTable(database, tableName, query.output, allowExisting, 
Some(desc))
+    sc.catalog.createTable(database, tableName, query.output, allowExisting, 
desc)
 
     // Get the Metastore Relation
     sc.catalog.lookupRelation(Some(database), tableName, None) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/31a6d4fe/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e9b1943..b341eae 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
     checkAnswer(
       sql("SELECT f1.f2.f3 FROM nested"),
       1)
+    checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
+      Seq.empty[Row])
+    checkAnswer(
+      sql("SELECT * FROM test_ctas_1234"),
+      sql("SELECT * FROM nested").collect().toSeq)
+
+    intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
+      sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect()
+    }
   }
 
   test("test CTAS") {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to