Repository: spark
Updated Branches:
  refs/heads/branch-1.2 572300ba8 -> 6104754f7


[SPARK-4152] [SQL] Avoid data change in CTAS while table already existed

CREATE TABLE t1 (a String);
CREATE TABLE t1 AS SELECT key FROM src; – throw exception
CREATE TABLE if not exists t1 AS SELECT key FROM src; – expect do nothing, 
currently it will overwrite the t1, which is incorrect.

Author: Cheng Hao <hao.ch...@intel.com>

Closes #3013 from chenghao-intel/ctas_unittest and squashes the following 
commits:

194113e [Cheng Hao] fix bug in CTAS when table already existed

(cherry picked from commit e83f13e8d37ca33f4e183e977d077221b90c6025)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6104754f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6104754f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6104754f

Branch: refs/heads/branch-1.2
Commit: 6104754f711da9eb0c09daf377bcd750d2d23f8a
Parents: 572300b
Author: Cheng Hao <hao.ch...@intel.com>
Authored: Mon Nov 3 13:59:43 2014 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Nov 3 14:00:06 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Catalog.scala   | 22 ++++++++++++++++++++
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 ++++++
 .../hive/execution/CreateTableAsSelect.scala    | 12 ++++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      |  9 ++++++--
 4 files changed, 46 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 2059a91..0415d74 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -28,6 +28,8 @@ trait Catalog {
 
   def caseSensitive: Boolean
 
+  def tableExists(db: Option[String], tableName: String): Boolean
+
   def lookupRelation(
     databaseName: Option[String],
     tableName: String,
@@ -82,6 +84,14 @@ class SimpleCatalog(val caseSensitive: Boolean) extends 
Catalog {
     tables.clear()
   }
 
+  override def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+    tables.get(tblName) match {
+      case Some(_) => true
+      case None => false
+    }
+  }
+
   override def lookupRelation(
       databaseName: Option[String],
       tableName: String,
@@ -107,6 +117,14 @@ trait OverrideCatalog extends Catalog {
   // TODO: This doesn't work when the database changes...
   val overrides = new mutable.HashMap[(Option[String],String), LogicalPlan]()
 
+  abstract override def tableExists(db: Option[String], tableName: String): 
Boolean = {
+    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
+    overrides.get((dbName, tblName)) match {
+      case Some(_) => true
+      case None => super.tableExists(db, tableName)
+    }
+  }
+
   abstract override def lookupRelation(
     databaseName: Option[String],
     tableName: String,
@@ -149,6 +167,10 @@ object EmptyCatalog extends Catalog {
 
   val caseSensitive: Boolean = true
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+    throw new UnsupportedOperationException
+  }
+
   def lookupRelation(
     databaseName: Option[String],
     tableName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 096b4a0..0baf4c9 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -57,6 +57,12 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
 
   val caseSensitive: Boolean = false
 
+  def tableExists(db: Option[String], tableName: String): Boolean = {
+    val (databaseName, tblName) = processDatabaseAndTableName(
+      db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
+    client.getTable(databaseName, tblName, false) != null
+  }
+
   def lookupRelation(
       db: Option[String],
       tableName: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 2fce414..3d24d87 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -71,7 +71,17 @@ case class CreateTableAsSelect(
     // TODO ideally, we should get the output data ready first and then
     // add the relation into catalog, just in case of failure occurs while data
     // processing.
-    sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, 
true)).toRdd
+    if (sc.catalog.tableExists(Some(database), tableName)) {
+      if (allowExisting) {
+        // table already exists, will do nothing, to keep consistent with Hive
+      } else {
+        throw
+          new 
org.apache.hadoop.hive.metastore.api.AlreadyExistsException(s"$database.$tableName")
+      }
+    } else {
+      sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, 
true)).toRdd
+    }
+
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/6104754f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 76a0ec0..e9b1943 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -56,7 +56,7 @@ class SQLQuerySuite extends QueryTest {
     sql(
       """CREATE TABLE IF NOT EXISTS ctas4 AS
         | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
-    // expect the string => integer for field key cause the table ctas4 
already existed.
+    // do nothing cause the table ctas4 already existed.
     sql(
       """CREATE TABLE IF NOT EXISTS ctas4 AS
         | SELECT key, value FROM src ORDER BY key, 
value""".stripMargin).collect
@@ -78,9 +78,14 @@ class SQLQuerySuite extends QueryTest {
           SELECT key, value
           FROM src
           ORDER BY key, value""").collect().toSeq)
+    intercept[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] {
+      sql(
+        """CREATE TABLE ctas4 AS
+          | SELECT key, value FROM src ORDER BY key, 
value""".stripMargin).collect
+    }
     checkAnswer(
       sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
-      sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, 
value").collect().toSeq)
+      sql("SELECT key, value FROM ctas4 LIMIT 1").collect().toSeq)
 
     checkExistence(sql("DESC EXTENDED ctas2"), true,
       "name:key", "type:string", "name:value", "ctas2",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to