Repository: spark
Updated Branches:
  refs/heads/master b28fe4a4a -> 88f559f20


[SPARK-18635][SQL] Partition name/values not escaped correctly in some cases

## What changes were proposed in this pull request?

Due to confusion between URI vs paths, in certain cases we escape partition 
values too many times, which causes some Hive client operations to fail or 
write data to the wrong location. This PR fixes at least some of these cases.

To my understanding this is how values, filesystem paths, and URIs interact.
- Hive stores raw (unescaped) partition values that are returned to you 
directly when you call listPartitions.
- Internally, we convert these raw values to filesystem paths via 
`ExternalCatalogUtils.[un]escapePathName`.
- In some circumstances we store URIs instead of filesystem paths. When a path 
is converted to a URI via `path.toURI`, the escaped partition values are 
further URI-encoded. This means that to get a path back from a URI, you must 
call `new Path(new URI(uriTxt))` in order to decode the URI-encoded string.
- In `CatalogStorageFormat` we store URIs as strings. This makes it easy to 
forget to URI-decode the value before converting it into a path.
- Finally, the Hive client itself uses mostly Paths for representing locations, 
and only URIs occasionally.

In the future we should probably clean this up, perhaps by dropping use of URIs 
when unnecessary. We should also try fixing escaping for partition names as 
well as values, though names are unlikely to contain special characters.

cc mallman cloud-fan yhuai

## How was this patch tested?

Unit tests.

Author: Eric Liang <e...@databricks.com>

Closes #16071 from ericl/spark-18635.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88f559f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88f559f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88f559f2

Branch: refs/heads/master
Commit: 88f559f20a5208f2386b874eb119f1cba2c748c7
Parents: b28fe4a
Author: Eric Liang <e...@databricks.com>
Authored: Thu Dec 1 16:48:10 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Dec 1 16:48:10 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  3 ++
 .../spark/sql/hive/HiveExternalCatalog.scala    |  5 +-
 .../apache/spark/sql/hive/client/HiveShim.scala |  6 ++-
 .../PartitionProviderCompatibilitySuite.scala   | 54 ++++++++++++++++++++
 4 files changed, 64 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88f559f2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index d8bc867..d2a1af0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -44,6 +44,9 @@ case class CatalogFunction(
  * Storage format, used to describe how a partition or a table is stored.
  */
 case class CatalogStorageFormat(
+    // TODO(ekl) consider storing this field as java.net.URI for type safety. 
Note that this must
+    // be converted to/from a hadoop Path object using new Path(new 
URI(locationUri)) and
+    // path.toUri respectively before use as a filesystem path due to URI char 
escaping.
     locationUri: Option[String],
     inputFormat: Option[String],
     outputFormat: Option[String],

http://git-wip-us.apache.org/repos/asf/spark/blob/88f559f2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index fd9dc32..1a9943b 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import java.io.IOException
+import java.net.URI
 import java.util
 
 import scala.util.control.NonFatal
@@ -833,10 +834,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       // However, Hive metastore is not case preserving and will generate 
wrong partition location
       // with lower cased partition column names. Here we set the default 
partition location
       // manually to avoid this problem.
-      val partitionPath = p.storage.locationUri.map(new Path(_)).getOrElse {
+      val partitionPath = p.storage.locationUri.map(uri => new Path(new 
URI(uri))).getOrElse {
         ExternalCatalogUtils.generatePartitionPath(p.spec, 
partitionColumnNames, tablePath)
       }
-      p.copy(storage = p.storage.copy(locationUri = 
Some(partitionPath.toString)))
+      p.copy(storage = p.storage.copy(locationUri = 
Some(partitionPath.toUri.toString)))
     }
     val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = 
lowerCasePartitionSpec(p.spec)))
     client.createPartitions(db, table, lowerCasedParts, ignoreIfExists)

http://git-wip-us.apache.org/repos/asf/spark/blob/88f559f2/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 3d9642d..e561706 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -268,7 +268,8 @@ private[client] class Shim_v0_12 extends Shim with Logging {
       ignoreIfExists: Boolean): Unit = {
     val table = hive.getTable(database, tableName)
     parts.foreach { s =>
-      val location = s.storage.locationUri.map(new Path(table.getPath, 
_)).orNull
+      val location = s.storage.locationUri.map(
+        uri => new Path(table.getPath, new Path(new URI(uri)))).orNull
       val params = if (s.parameters.nonEmpty) s.parameters.asJava else null
       val spec = s.spec.asJava
       if (hive.getPartition(table, spec, false) != null && ignoreIfExists) {
@@ -463,7 +464,8 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
       ignoreIfExists: Boolean): Unit = {
     val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists)
     parts.zipWithIndex.foreach { case (s, i) =>
-      addPartitionDesc.addPartition(s.spec.asJava, 
s.storage.locationUri.orNull)
+      addPartitionDesc.addPartition(
+        s.spec.asJava, s.storage.locationUri.map(u => new Path(new 
URI(u)).toString).orNull)
       if (s.parameters.nonEmpty) {
         addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/88f559f2/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index cace5fa..e8e4238 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -205,6 +205,60 @@ class PartitionProviderCompatibilitySuite
         }
       }
     }
+
+    test(s"SPARK-18635 special chars in partition values - partition 
management $enabled") {
+      withTable("test") {
+        spark.range(10)
+          .selectExpr("id", "id as A", "'%' as B")
+          .write.partitionBy("A", "B").mode("overwrite")
+          .saveAsTable("test")
+        assert(spark.sql("select * from test").count() == 10)
+        assert(spark.sql("select * from test where B = '%'").count() == 10)
+        assert(spark.sql("select * from test where B = '$'").count() == 0)
+        spark.range(10)
+          .selectExpr("id", "id as A", "'=' as B")
+          .write.mode("append").insertInto("test")
+        spark.sql("insert into test partition (A, B) select id, id, '%=' from 
range(10)")
+        assert(spark.sql("select * from test").count() == 30)
+        assert(spark.sql("select * from test where B = '%'").count() == 10)
+        assert(spark.sql("select * from test where B = '='").count() == 10)
+        assert(spark.sql("select * from test where B = '%='").count() == 10)
+
+        // show partitions sanity check
+        val parts = spark.sql("show partitions 
test").collect().map(_.get(0)).toSeq
+        assert(parts.length == 30)
+        assert(parts.contains("A=0/B=%25"))
+        assert(parts.contains("A=0/B=%3D"))
+        assert(parts.contains("A=0/B=%25%3D"))
+
+        // drop partition sanity check
+        spark.sql("alter table test drop partition (A=1, B='%')")
+        assert(spark.sql("select * from test").count() == 29)  // 1 file in 
dropped partition
+
+        withTempDir { dir =>
+          // custom locations sanity check
+          spark.sql(s"""
+            |alter table test partition (A=0, B='%')
+            |set location '${dir.getAbsolutePath}'""".stripMargin)
+          assert(spark.sql("select * from test").count() == 28)  // moved to 
empty dir
+
+          // rename partition sanity check
+          spark.sql(s"""
+            |alter table test partition (A=5, B='%')
+            |rename to partition (A=100, B='%')""".stripMargin)
+          assert(spark.sql("select * from test where a = 5 and b = 
'%'").count() == 0)
+          assert(spark.sql("select * from test where a = 100 and b = 
'%'").count() == 1)
+
+          // try with A=0 which has a custom location
+          spark.sql("insert into test partition (A=0, B='%') select 1")
+          spark.sql(s"""
+            |alter table test partition (A=0, B='%')
+            |rename to partition (A=101, B='%')""".stripMargin)
+          assert(spark.sql("select * from test where a = 0 and b = 
'%'").count() == 0)
+          assert(spark.sql("select * from test where a = 101 and b = 
'%'").count() == 1)
+        }
+      }
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to