Repository: spark
Updated Branches:
refs/heads/branch-2.0 0c721eedc -> d02f2926b
[SPARK-15715][SQL] Fix alter partition with storage information in Hive
## What changes were proposed in this pull request?
This command didn't work for Hive tables. Now it does:
```
ALTER TABLE boxes PARTITION (width=3)
SET SERDE 'com.sparkbricks.serde.ColumnarSerDe'
WITH SERDEPROPERTIES ('compress'='true')
```
## How was this patch tested?
`HiveExternalCatalogSuite`
Author: Andrew Or <[email protected]>
Closes #13453 from andrewor14/alter-partition-storage.
(cherry picked from commit d1c1fbc345a704a2c8210960683f33f945660d5a)
Signed-off-by: Andrew Or <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d02f2926
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d02f2926
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d02f2926
Branch: refs/heads/branch-2.0
Commit: d02f2926bfbbeb6593cc1deccfb0360ba5b4f0f0
Parents: 0c721ee
Author: Andrew Or <[email protected]>
Authored: Thu Jun 2 17:44:48 2016 -0700
Committer: Andrew Or <[email protected]>
Committed: Thu Jun 2 17:44:56 2016 -0700
----------------------------------------------------------------------
.../catalyst/catalog/ExternalCatalogSuite.scala | 10 +++++++
.../spark/sql/hive/client/HiveClientImpl.scala | 30 ++++++++++++++------
.../spark/sql/hive/client/VersionsSuite.scala | 5 +++-
.../spark/sql/hive/execution/HiveDDLSuite.scala | 22 ++++++++++++++
4 files changed, 57 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d02f2926/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 377e64b..0c4d363 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -382,6 +382,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite
with BeforeAndAfterEac
// See HIVE-2742 for more detail.
catalog.setCurrentDatabase("db2")
val newLocation = newUriForDatabase()
+ val newSerde = "com.sparkbricks.text.EasySerde"
+ val newSerdeProps = Map("spark" -> "bricks", "compressed" -> "false")
// alter but keep spec the same
val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
@@ -394,6 +396,14 @@ abstract class ExternalCatalogSuite extends SparkFunSuite
with BeforeAndAfterEac
assert(newPart2.storage.locationUri == Some(newLocation))
assert(oldPart1.storage.locationUri != Some(newLocation))
assert(oldPart2.storage.locationUri != Some(newLocation))
+ // alter other storage information
+ catalog.alterPartitions("db2", "tbl2", Seq(
+ oldPart1.copy(storage = storageFormat.copy(serde = Some(newSerde))),
+ oldPart2.copy(storage = storageFormat.copy(serdeProperties =
newSerdeProps))))
+ val newPart1b = catalog.getPartition("db2", "tbl2", part1.spec)
+ val newPart2b = catalog.getPartition("db2", "tbl2", part2.spec)
+ assert(newPart1b.storage.serde == Some(newSerde))
+ assert(newPart2b.storage.serdeProperties == newSerdeProps)
// alter but change spec, should fail because new partition specs do not
exist yet
val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2"))
val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4"))
http://git-wip-us.apache.org/repos/asf/spark/blob/d02f2926/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 47fa418..1c89d8c 100644
---
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.{TableType => HiveTableType}
import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase,
FieldSchema}
+import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition,
Table => HiveTable}
import org.apache.hadoop.hive.ql.processors._
@@ -779,18 +780,29 @@ private[hive] class HiveClientImpl(
hiveTable
}
- private def toHiveViewTable(view: CatalogTable): HiveTable = {
- val tbl = toHiveTable(view)
- tbl.setTableType(HiveTableType.VIRTUAL_VIEW)
- tbl.setSerializationLib(null)
- tbl.clearSerDeInfo()
- tbl
- }
-
private def toHivePartition(
p: CatalogTablePartition,
ht: HiveTable): HivePartition = {
- new HivePartition(ht, p.spec.asJava, p.storage.locationUri.map { l => new
Path(l) }.orNull)
+ val tpart = new org.apache.hadoop.hive.metastore.api.Partition
+ val partValues = ht.getPartCols.asScala.map { hc =>
+ p.spec.get(hc.getName).getOrElse {
+ throw new IllegalArgumentException(
+ s"Partition spec is missing a value for column '${hc.getName}':
${p.spec}")
+ }
+ }
+ val storageDesc = new StorageDescriptor
+ val serdeInfo = new SerDeInfo
+ p.storage.locationUri.foreach(storageDesc.setLocation)
+ p.storage.inputFormat.foreach(storageDesc.setInputFormat)
+ p.storage.outputFormat.foreach(storageDesc.setOutputFormat)
+ p.storage.serde.foreach(serdeInfo.setSerializationLib)
+ serdeInfo.setParameters(p.storage.serdeProperties.asJava)
+ storageDesc.setSerdeInfo(serdeInfo)
+ tpart.setDbName(ht.getDbName)
+ tpart.setTableName(ht.getTableName)
+ tpart.setValues(partValues.asJava)
+ tpart.setSd(storageDesc)
+ new HivePartition(ht, tpart)
}
private def fromHivePartition(hp: HivePartition): CatalogTablePartition = {
http://git-wip-us.apache.org/repos/asf/spark/blob/d02f2926/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 8ae4535..5b209ac 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -354,7 +354,10 @@ class VersionsSuite extends SparkFunSuite with Logging {
test(s"$version: alterPartitions") {
val spec = Map("key1" -> "1", "key2" -> "2")
val newLocation = Utils.createTempDir().getPath()
- val storage = storageFormat.copy(locationUri = Some(newLocation))
+ val storage = storageFormat.copy(
+ locationUri = Some(newLocation),
+ // needed for 0.12 alter partitions
+ serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
val partition = CatalogTablePartition(spec, storage)
client.alterPartitions("default", "src_part", Seq(partition))
assert(client.getPartition("default", "src_part", spec)
http://git-wip-us.apache.org/repos/asf/spark/blob/d02f2926/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index a98d469..b2f01fc 100644
---
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -345,6 +345,28 @@ class HiveDDLSuite
}
}
+ test("alter table partition - storage information") {
+ sql("CREATE TABLE boxes (height INT, length INT) PARTITIONED BY (width
INT)")
+ sql("INSERT OVERWRITE TABLE boxes PARTITION (width=4) SELECT 4, 4")
+ val catalog = spark.sessionState.catalog
+ val expectedSerde = "com.sparkbricks.serde.ColumnarSerDe"
+ val expectedSerdeProps = Map("compress" -> "true")
+ val expectedSerdePropsString =
+ expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ")
+ val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width"
-> "4"))
+ assume(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was
already set")
+
assume(oldPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains)
!=
+ expectedSerdeProps, "bad test: serde properties were already set")
+ sql(s"""ALTER TABLE boxes PARTITION (width=4)
+ | SET SERDE '$expectedSerde'
+ | WITH SERDEPROPERTIES ($expectedSerdePropsString)
+ |""".stripMargin)
+ val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width"
-> "4"))
+ assert(newPart.storage.serde == Some(expectedSerde))
+
assume(newPart.storage.serdeProperties.filterKeys(expectedSerdeProps.contains)
==
+ expectedSerdeProps)
+ }
+
test("drop table using drop view") {
withTable("tab1") {
sql("CREATE TABLE tab1(c1 int)")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]