Repository: spark
Updated Branches:
  refs/heads/branch-2.0 655d88293 -> c55a39c97


[SPARK-15279][SQL] Catch conflicting SerDe when creating table

## What changes were proposed in this pull request?

The user may do something like:
```
CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS PARQUET
CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS ... SERDE 'myserde'
CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ORC
CREATE TABLE my_tab ROW FORMAT DELIMITED ... STORED AS ... SERDE 'myserde'
```
None of these should be allowed because the SerDe's conflict. As of this patch:
- `ROW FORMAT DELIMITED` is only compatible with `TEXTFILE`
- `ROW FORMAT SERDE` is only compatible with `TEXTFILE`, `RCFILE` and 
`SEQUENCEFILE`

## How was this patch tested?

New tests in `DDLCommandSuite`.

Author: Andrew Or <and...@databricks.com>

Closes #13068 from andrewor14/row-format-conflict.

(cherry picked from commit 2585d2b322f3b6b85a0a12ddf7dcde957453000d)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c55a39c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c55a39c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c55a39c9

Branch: refs/heads/branch-2.0
Commit: c55a39c973ca617122e5f8af40316e259399b9c9
Parents: 655d882
Author: Andrew Or <and...@databricks.com>
Authored: Mon May 23 11:55:03 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Mon May 23 11:55:13 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  4 +-
 .../spark/sql/execution/SparkSqlParser.scala    | 60 +++++++++++--
 .../sql/execution/command/DDLCommandSuite.scala | 94 ++++++++++++++++----
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |  4 +-
 4 files changed, 129 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c55a39c9/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 848c59e..8ea8f76 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -267,8 +267,8 @@ createFileFormat
     ;
 
 fileFormat
-    : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING (SERDE 
serdeCls=STRING)?         #tableFileFormat
-    | identifier                                                               
            #genericFileFormat
+    : INPUTFORMAT inFmt=STRING OUTPUTFORMAT outFmt=STRING    #tableFileFormat
+    | identifier                                             #genericFileFormat
     ;
 
 storageHandler

http://git-wip-us.apache.org/repos/asf/spark/blob/c55a39c9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index c517b8b..6e4af95 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -796,14 +796,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
    *
    * Expected format:
    * {{{
-   *   CREATE [TEMPORARY] [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
-   *   [(col1 data_type [COMMENT col_comment], ...)]
+   *   CREATE [EXTERNAL] TABLE [IF NOT EXISTS] [db_name.]table_name
+   *   [(col1[:] data_type [COMMENT col_comment], ...)]
    *   [COMMENT table_comment]
-   *   [PARTITIONED BY (col3 data_type [COMMENT col_comment], ...)]
-   *   [CLUSTERED BY (col1, ...) [SORTED BY (col1 [ASC|DESC], ...)] INTO 
num_buckets BUCKETS]
-   *   [SKEWED BY (col1, col2, ...) ON ((col_value, col_value, ...), ...) 
[STORED AS DIRECTORIES]]
+   *   [PARTITIONED BY (col2[:] data_type [COMMENT col_comment], ...)]
    *   [ROW FORMAT row_format]
-   *   [STORED AS file_format | STORED BY storage_handler_class [WITH 
SERDEPROPERTIES (...)]]
+   *   [STORED AS file_format]
    *   [LOCATION path]
    *   [TBLPROPERTIES (property_name=property_value, ...)]
    *   [AS select_statement];
@@ -849,6 +847,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         compressed = false,
         serdeProperties = Map())
     }
+    validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
     val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
       .getOrElse(EmptyStorageFormat)
     val rowStorage = 
Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
@@ -905,6 +904,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
   /**
    * Create a [[CatalogStorageFormat]] for creating tables.
+   *
+   * Format: STORED AS ...
    */
   override def visitCreateFileFormat(
       ctx: CreateFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
@@ -932,9 +933,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
     EmptyStorageFormat.copy(
       inputFormat = Option(string(ctx.inFmt)),
-      outputFormat = Option(string(ctx.outFmt)),
-      serde = Option(ctx.serdeCls).map(string)
-    )
+      outputFormat = Option(string(ctx.outFmt)))
   }
 
   /**
@@ -1019,6 +1018,49 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
   }
 
   /**
+   * Throw a [[ParseException]] if the user specified incompatible SerDes 
through ROW FORMAT
+   * and STORED AS.
+   *
+   * The following are allowed. Anything else is not:
+   *   ROW FORMAT SERDE ... STORED AS [SEQUENCEFILE | RCFILE | TEXTFILE]
+   *   ROW FORMAT DELIMITED ... STORED AS TEXTFILE
+   *   ROW FORMAT ... STORED AS INPUTFORMAT ... OUTPUTFORMAT ...
+   */
+  private def validateRowFormatFileFormat(
+      rowFormatCtx: RowFormatContext,
+      createFileFormatCtx: CreateFileFormatContext,
+      parentCtx: ParserRuleContext): Unit = {
+    if (rowFormatCtx == null || createFileFormatCtx == null) {
+      return
+    }
+    (rowFormatCtx, createFileFormatCtx.fileFormat) match {
+      case (_, ffTable: TableFileFormatContext) => // OK
+      case (rfSerde: RowFormatSerdeContext, ffGeneric: 
GenericFileFormatContext) =>
+        ffGeneric.identifier.getText.toLowerCase match {
+          case ("sequencefile" | "textfile" | "rcfile") => // OK
+          case fmt =>
+            throw operationNotAllowed(
+              s"ROW FORMAT SERDE is incompatible with format '$fmt', which 
also specifies a serde",
+              parentCtx)
+        }
+      case (rfDelimited: RowFormatDelimitedContext, ffGeneric: 
GenericFileFormatContext) =>
+        ffGeneric.identifier.getText.toLowerCase match {
+          case "textfile" => // OK
+          case fmt => throw operationNotAllowed(
+            s"ROW FORMAT DELIMITED is only compatible with 'textfile', not 
'$fmt'", parentCtx)
+        }
+      case _ =>
+        // should never happen
+        def str(ctx: ParserRuleContext): String = {
+          (0 until ctx.getChildCount).map { i => ctx.getChild(i).getText 
}.mkString(" ")
+        }
+        throw operationNotAllowed(
+          s"Unexpected combination of ${str(rowFormatCtx)} and 
${str(createFileFormatCtx)}",
+          parentCtx)
+    }
+  }
+
+  /**
    * Create or replace a view. This creates a [[CreateViewCommand]] command.
    *
    * For example:

http://git-wip-us.apache.org/repos/asf/spark/blob/c55a39c9/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 54f98a6..eab1f55 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.command
 
+import scala.reflect.{classTag, ClassTag}
+
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, 
FunctionResource}
 import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
@@ -25,9 +27,10 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
 import org.apache.spark.sql.execution.datasources.{BucketSpec, 
CreateTableUsing}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
+
 // TODO: merge this with DDLSuite (SPARK-14441)
 class DDLCommandSuite extends PlanTest {
   private val parser = new SparkSqlParser(new SQLConf)
@@ -40,6 +43,15 @@ class DDLCommandSuite extends PlanTest {
     containsThesePhrases.foreach { p => 
assert(e.getMessage.toLowerCase.contains(p.toLowerCase)) }
   }
 
+  private def parseAs[T: ClassTag](query: String): T = {
+    parser.parsePlan(query) match {
+      case t: T => t
+      case other =>
+        fail(s"Expected to parse ${classTag[T].runtimeClass} from query," +
+          s"got ${other.getClass.getName}: $query")
+    }
+  }
+
   test("create database") {
     val sql =
       """
@@ -225,19 +237,69 @@ class DDLCommandSuite extends PlanTest {
     comparePlans(parsed4, expected4)
   }
 
+  test("create table - row format and table file format") {
+    val createTableStart = "CREATE TABLE my_tab ROW FORMAT"
+    val fileFormat = s"STORED AS INPUTFORMAT 'inputfmt' OUTPUTFORMAT 
'outputfmt'"
+    val query1 = s"$createTableStart SERDE 'anything' $fileFormat"
+    val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' 
$fileFormat"
+
+    // No conflicting serdes here, OK
+    val parsed1 = parseAs[CreateTableCommand](query1)
+    assert(parsed1.table.storage.serde == Some("anything"))
+    assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
+    assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
+    val parsed2 = parseAs[CreateTableCommand](query2)
+    assert(parsed2.table.storage.serde.isEmpty)
+    assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
+    assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
+  }
+
+  test("create table - row format serde and generic file format") {
+    val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", 
"textfile")
+    val supportedSources = Set("sequencefile", "rcfile", "textfile")
+
+    allSources.foreach { s =>
+      val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS 
$s"
+      if (supportedSources.contains(s)) {
+        val ct = parseAs[CreateTableCommand](query)
+        val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+        assert(hiveSerde.isDefined)
+        assert(ct.table.storage.serde == Some("anything"))
+        assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
+        assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+      } else {
+        assertUnsupported(query, Seq("row format serde", "incompatible", s))
+      }
+    }
+  }
+
+  test("create table - row format delimited and generic file format") {
+    val allSources = Seq("parquet", "orc", "avro", "sequencefile", "rcfile", 
"textfile")
+    val supportedSources = Set("textfile")
+
+    allSources.foreach { s =>
+      val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED 
BY ' ' STORED AS $s"
+      if (supportedSources.contains(s)) {
+        val ct = parseAs[CreateTableCommand](query)
+        val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
+        assert(hiveSerde.isDefined)
+        assert(ct.table.storage.serde == hiveSerde.get.serde)
+        assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
+        assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+      } else {
+        assertUnsupported(query, Seq("row format delimited", "only compatible 
with 'textfile'", s))
+      }
+    }
+  }
+
   test("create external table - location must be specified") {
     assertUnsupported(
       sql = "CREATE EXTERNAL TABLE my_tab",
       containsThesePhrases = Seq("create external table", "location"))
     val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
-    parser.parsePlan(query) match {
-      case ct: CreateTableCommand =>
-        assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-        assert(ct.table.storage.locationUri == Some("/something/anything"))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-          s"got ${other.getClass.getName}: $query")
-    }
+    val ct = parseAs[CreateTableCommand](query)
+    assert(ct.table.tableType == CatalogTableType.EXTERNAL)
+    assert(ct.table.storage.locationUri == Some("/something/anything"))
   }
 
   test("create table - property values must be set") {
@@ -252,14 +314,9 @@ class DDLCommandSuite extends PlanTest {
 
   test("create table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
-    parser.parsePlan(query) match {
-      case ct: CreateTableCommand =>
-        assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-        assert(ct.table.storage.locationUri == Some("/something/anything"))
-      case other =>
-        fail(s"Expected to parse 
${classOf[CreateTableCommand].getClass.getName} from query," +
-            s"got ${other.getClass.getName}: $query")
-    }
+    val ct = parseAs[CreateTableCommand](query)
+    assert(ct.table.tableType == CatalogTableType.EXTERNAL)
+    assert(ct.table.storage.locationUri == Some("/something/anything"))
   }
 
   test("create table using - with partitioned by") {
@@ -551,8 +608,7 @@ class DDLCommandSuite extends PlanTest {
 
   test("alter table: set file format (not allowed)") {
     assertUnsupported(
-      "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' " +
-        "OUTPUTFORMAT 'test' SERDE 'test'")
+      "ALTER TABLE table_name SET FILEFORMAT INPUTFORMAT 'test' OUTPUTFORMAT 
'test'")
     assertUnsupported(
       "ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') " +
         "SET FILEFORMAT PARQUET")

http://git-wip-us.apache.org/repos/asf/spark/blob/c55a39c9/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 30ad392..96c8fa6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -61,7 +61,7 @@ class HiveDDLCommandSuite extends PlanTest {
         |country STRING COMMENT 'country of origination')
         |COMMENT 'This is the staging page view table'
         |PARTITIONED BY (dt STRING COMMENT 'date type', hour STRING COMMENT 
'hour of the day')
-        |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\054' STORED AS RCFILE
+        |STORED AS RCFILE
         |LOCATION '/user/external/page_view'
         |TBLPROPERTIES ('p1'='v1', 'p2'='v2')
         |AS SELECT * FROM src""".stripMargin
@@ -88,8 +88,6 @@ class HiveDDLCommandSuite extends PlanTest {
     assert(desc.partitionColumns ==
       CatalogColumn("dt", "string", comment = Some("date type")) ::
       CatalogColumn("hour", "string", comment = Some("hour of the day")) :: 
Nil)
-    assert(desc.storage.serdeProperties ==
-      Map((serdeConstants.SERIALIZATION_FORMAT, "\u002C"), 
(serdeConstants.FIELD_DELIM, "\u002C")))
     assert(desc.storage.inputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileInputFormat"))
     assert(desc.storage.outputFormat == 
Some("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"))
     assert(desc.storage.serde ==


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to