Repository: spark
Updated Branches:
  refs/heads/branch-2.0 dc6e94157 -> 80a40e8e2


[SPARK-15594][SQL] ALTER TABLE SERDEPROPERTIES does not respect partition spec

## What changes were proposed in this pull request?

These commands ignore the partition spec and change the storage properties of 
the table itself:
```
ALTER TABLE table_name PARTITION (a=1, b=2) SET SERDE 'my_serde'
ALTER TABLE table_name PARTITION (a=1, b=2) SET SERDEPROPERTIES ('key1'='val1')
```
Now they change the storage properties of the specified partition.

## How was this patch tested?

DDLSuite

Author: Andrew Or <[email protected]>

Closes #13343 from andrewor14/alter-table-serdeproperties.

(cherry picked from commit 4a2fb8b87ca4517e0f4a1d7a1a1b3c08c1c1294d)
Signed-off-by: Yin Huai <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80a40e8e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80a40e8e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80a40e8e

Branch: refs/heads/branch-2.0
Commit: 80a40e8e2cc198c34dabbc431d4ca302319fbbad
Parents: dc6e941
Author: Andrew Or <[email protected]>
Authored: Fri May 27 17:27:24 2016 -0700
Committer: Yin Huai <[email protected]>
Committed: Fri May 27 17:27:38 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/command/ddl.scala       | 26 ++++++--
 .../spark/sql/execution/command/DDLSuite.scala  | 64 ++++++++++++++++++++
 2 files changed, 84 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/80a40e8e/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 95bac94..5fd0b83 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -293,7 +293,7 @@ case class AlterTableSerDePropertiesCommand(
     tableName: TableIdentifier,
     serdeClassName: Option[String],
     serdeProperties: Option[Map[String, String]],
-    partition: Option[Map[String, String]])
+    partSpec: Option[TablePartitionSpec])
   extends RunnableCommand {
 
   // should never happen if we parsed things correctly
@@ -306,15 +306,29 @@ case class AlterTableSerDePropertiesCommand(
       "ALTER TABLE SERDEPROPERTIES")
     val catalog = sparkSession.sessionState.catalog
     val table = catalog.getTableMetadata(tableName)
-    // Do not support setting serde for datasource tables
+    // For datasource tables, disallow setting serde or specifying partition
+    if (partSpec.isDefined && DDLUtils.isDatasourceTable(table)) {
+      throw new AnalysisException("Operation not allowed: ALTER TABLE SET " +
+        "[SERDE | SERDEPROPERTIES] for a specific partition is not supported " 
+
+        "for tables created with the datasource API")
+    }
     if (serdeClassName.isDefined && DDLUtils.isDatasourceTable(table)) {
       throw new AnalysisException("Operation not allowed: ALTER TABLE SET 
SERDE is " +
         "not supported for tables created with the datasource API")
     }
-    val newTable = table.withNewStorage(
-      serde = serdeClassName.orElse(table.storage.serde),
-      serdeProperties = table.storage.serdeProperties ++ 
serdeProperties.getOrElse(Map()))
-    catalog.alterTable(newTable)
+    if (partSpec.isEmpty) {
+      val newTable = table.withNewStorage(
+        serde = serdeClassName.orElse(table.storage.serde),
+        serdeProperties = table.storage.serdeProperties ++ 
serdeProperties.getOrElse(Map()))
+      catalog.alterTable(newTable)
+    } else {
+      val spec = partSpec.get
+      val part = catalog.getPartition(tableName, spec)
+      val newPart = part.copy(storage = part.storage.copy(
+        serde = serdeClassName.orElse(part.storage.serde),
+        serdeProperties = part.storage.serdeProperties ++ 
serdeProperties.getOrElse(Map())))
+      catalog.alterPartitions(tableName, Seq(newPart))
+    }
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80a40e8e/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index ccb4006..5d45cfb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -538,6 +538,14 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     testSetSerde(isDatasourceTable = true)
   }
 
+  test("alter table: set serde partition") {
+    testSetSerdePartition(isDatasourceTable = false)
+  }
+
+  test("alter table: set serde partition (datasource table)") {
+    testSetSerdePartition(isDatasourceTable = true)
+  }
+
   test("alter table: bucketing is not supported") {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))
@@ -931,6 +939,62 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     assert(e.getMessage.contains(DATASOURCE_PREFIX + "foo"))
   }
 
+  private def testSetSerdePartition(isDatasourceTable: Boolean): Unit = {
+    val catalog = spark.sessionState.catalog
+    val tableIdent = TableIdentifier("tab1", Some("dbx"))
+    val spec = Map("a" -> "1", "b" -> "2")
+    createDatabase(catalog, "dbx")
+    createTable(catalog, tableIdent)
+    createTablePartition(catalog, spec, tableIdent)
+    createTablePartition(catalog, Map("a" -> "1", "b" -> "3"), tableIdent)
+    createTablePartition(catalog, Map("a" -> "2", "b" -> "2"), tableIdent)
+    createTablePartition(catalog, Map("a" -> "2", "b" -> "3"), tableIdent)
+    if (isDatasourceTable) {
+      convertToDatasourceTable(catalog, tableIdent)
+    }
+    assert(catalog.getPartition(tableIdent, spec).storage.serde.isEmpty)
+    assert(catalog.getPartition(tableIdent, 
spec).storage.serdeProperties.isEmpty)
+    // set table serde and/or properties (should fail on datasource tables)
+    if (isDatasourceTable) {
+      val e1 = intercept[AnalysisException] {
+        sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 'whatever'")
+      }
+      val e2 = intercept[AnalysisException] {
+        sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 
'org.apache.madoop' " +
+          "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+      }
+      assert(e1.getMessage.contains("datasource"))
+      assert(e2.getMessage.contains("datasource"))
+    } else {
+      sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 
'org.apache.jadoop'")
+      assert(catalog.getPartition(tableIdent, spec).storage.serde == 
Some("org.apache.jadoop"))
+      assert(catalog.getPartition(tableIdent, 
spec).storage.serdeProperties.isEmpty)
+      sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) SET SERDE 
'org.apache.madoop' " +
+        "WITH SERDEPROPERTIES ('k' = 'v', 'kay' = 'vee')")
+      assert(catalog.getPartition(tableIdent, spec).storage.serde == 
Some("org.apache.madoop"))
+      assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
+        Map("k" -> "v", "kay" -> "vee"))
+    }
+    // set serde properties only
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE dbx.tab1 PARTITION (a=1, b=2) " +
+        "SET SERDEPROPERTIES ('k' = 'vvv', 'kay' = 'vee')")
+      assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
+        Map("k" -> "vvv", "kay" -> "vee"))
+    }
+    // set things without explicitly specifying database
+    catalog.setCurrentDatabase("dbx")
+    maybeWrapException(isDatasourceTable) {
+      sql("ALTER TABLE tab1 PARTITION (a=1, b=2) SET SERDEPROPERTIES ('kay' = 
'veee')")
+      assert(catalog.getPartition(tableIdent, spec).storage.serdeProperties ==
+        Map("k" -> "vvv", "kay" -> "veee"))
+    }
+    // table to alter does not exist
+    intercept[AnalysisException] {
+      sql("ALTER TABLE does_not_exist SET SERDEPROPERTIES ('x' = 'y')")
+    }
+  }
+
   private def testAddPartitions(isDatasourceTable: Boolean): Unit = {
     val catalog = spark.sessionState.catalog
     val tableIdent = TableIdentifier("tab1", Some("dbx"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to