Repository: spark
Updated Branches:
  refs/heads/master 50ada2a4d -> 04975a68b


[SPARK-22109][SQL] Resolves type conflicts between strings and timestamps in 
partition column

## What changes were proposed in this pull request?

This PR proposes to resolve the type conflicts in strings and timestamps in 
partition column values.
It looks we need to set the timezone as it needs a cast between strings and 
timestamps.

```scala
val df = Seq((1, "2015-01-01 00:00:00"), (2, "2014-01-01 00:00:00"), (3, 
"blah")).toDF("i", "str")
val path = "/tmp/test.parquet"
df.write.format("parquet").partitionBy("str").save(path)
spark.read.parquet(path).show()
```

**Before**

```
java.util.NoSuchElementException: None.get
  at scala.None$.get(Option.scala:347)
  at scala.None$.get(Option.scala:345)
  at 
org.apache.spark.sql.catalyst.expressions.TimeZoneAwareExpression$class.timeZone(datetimeExpressions.scala:46)
  at 
org.apache.spark.sql.catalyst.expressions.Cast.timeZone$lzycompute(Cast.scala:172)
  at org.apache.spark.sql.catalyst.expressions.Cast.timeZone(Cast.scala:172)
  at 
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
  at 
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3$$anonfun$apply$16.apply(Cast.scala:208)
  at 
org.apache.spark.sql.catalyst.expressions.Cast.org$apache$spark$sql$catalyst$expressions$Cast$$buildCast(Cast.scala:201)
  at 
org.apache.spark.sql.catalyst.expressions.Cast$$anonfun$castToString$3.apply(Cast.scala:207)
  at org.apache.spark.sql.catalyst.expressions.Cast.nullSafeEval(Cast.scala:533)
  at 
org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:331)
  at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:481)
  at 
org.apache.spark.sql.execution.datasources.PartitioningUtils$$anonfun$org$apache$spark$sql$execution$datasources$PartitioningUtils$$resolveTypeConflicts$1.apply(PartitioningUtils.scala:480)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
  at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
```

**After**

```
+---+-------------------+
|  i|                str|
+---+-------------------+
|  2|2014-01-01 00:00:00|
|  1|2015-01-01 00:00:00|
|  3|               blah|
+---+-------------------+
```

## How was this patch tested?

Unit tests added in `ParquetPartitionDiscoverySuite` and manual tests.

Author: hyukjinkwon <[email protected]>

Closes #19331 from HyukjinKwon/SPARK-22109.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04975a68
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04975a68
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04975a68

Branch: refs/heads/master
Commit: 04975a68b583a6175f93da52374108e5d4754d9a
Parents: 50ada2a
Author: hyukjinkwon <[email protected]>
Authored: Sun Sep 24 00:05:17 2017 +0900
Committer: Takuya UESHIN <[email protected]>
Committed: Sun Sep 24 00:05:17 2017 +0900

----------------------------------------------------------------------
 .../sql/execution/datasources/PartitioningUtils.scala   | 11 ++++++-----
 .../parquet/ParquetPartitionDiscoverySuite.scala        | 12 ++++++++++++
 2 files changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/04975a68/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 92358da..1c00c9e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -139,7 +139,7 @@ object PartitioningUtils {
           "root directory of the table. If there are multiple root 
directories, " +
           "please load them separately and then union them.")
 
-      val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)
+      val resolvedPartitionValues = 
resolvePartitions(pathsWithPartitionValues, timeZone)
 
       // Creates the StructType which represents the partition columns.
       val fields = {
@@ -318,7 +318,8 @@ object PartitioningUtils {
    * }}}
    */
   def resolvePartitions(
-      pathsWithPartitionValues: Seq[(Path, PartitionValues)]): 
Seq[PartitionValues] = {
+      pathsWithPartitionValues: Seq[(Path, PartitionValues)],
+      timeZone: TimeZone): Seq[PartitionValues] = {
     if (pathsWithPartitionValues.isEmpty) {
       Seq.empty
     } else {
@@ -333,7 +334,7 @@ object PartitioningUtils {
       val values = pathsWithPartitionValues.map(_._2)
       val columnCount = values.head.columnNames.size
       val resolvedValues = (0 until columnCount).map { i =>
-        resolveTypeConflicts(values.map(_.literals(i)))
+        resolveTypeConflicts(values.map(_.literals(i)), timeZone)
       }
 
       // Fills resolved literals back to each partition
@@ -470,7 +471,7 @@ object PartitioningUtils {
    * Given a collection of [[Literal]]s, resolves possible type conflicts by 
up-casting "lower"
    * types.
    */
-  private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
+  private def resolveTypeConflicts(literals: Seq[Literal], timeZone: 
TimeZone): Seq[Literal] = {
     val desiredType = {
       val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
       // Falls back to string if all values of this column are null or empty 
string
@@ -478,7 +479,7 @@ object PartitioningUtils {
     }
 
     literals.map { case l @ Literal(_, dataType) =>
-      Literal.create(Cast(l, desiredType).eval(), desiredType)
+      Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(), 
desiredType)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/04975a68/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index 837a087..f79b92b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -1055,4 +1055,16 @@ class ParquetPartitionDiscoverySuite extends QueryTest 
with ParquetTest with Sha
       }
     }
   }
+
+  test("SPARK-22109: Resolve type conflicts between strings and timestamps in 
partition column") {
+    val df = Seq(
+      (1, "2015-01-01 00:00:00"),
+      (2, "2014-01-01 00:00:00"),
+      (3, "blah")).toDF("i", "str")
+
+    withTempPath { path =>
+      df.write.format("parquet").partitionBy("str").save(path.getAbsolutePath)
+      checkAnswer(spark.read.load(path.getAbsolutePath), df)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to