Repository: spark
Updated Branches:
refs/heads/master 2d868d939 -> 6d7ebf2f9
[SPARK-22165][SQL] Fixes type conflicts between double, long, decimals, dates
and timestamps in partition column
## What changes were proposed in this pull request?
This PR proposes to add a rule that re-uses `TypeCoercion.findWiderCommonType`
when resolving type conflicts in partition values.
Currently, this uses numeric precedence-like comparison; therefore, it looks
introducing failures for type conflicts between timestamps, dates and decimals,
please see:
```scala
private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
...
literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
```
The codes below:
```scala
val df = Seq((1, "2015-01-01"), (2, "2016-01-01 00:00:00")).toDF("i", "ts")
df.write.format("parquet").partitionBy("ts").save("/tmp/foo")
spark.read.load("/tmp/foo").printSchema()
val df = Seq((1, "1"), (2, "1" * 30)).toDF("i", "decimal")
df.write.format("parquet").partitionBy("decimal").save("/tmp/bar")
spark.read.load("/tmp/bar").printSchema()
```
produces output as below:
**Before**
```
root
|-- i: integer (nullable = true)
|-- ts: date (nullable = true)
root
|-- i: integer (nullable = true)
|-- decimal: integer (nullable = true)
```
**After**
```
root
|-- i: integer (nullable = true)
|-- ts: timestamp (nullable = true)
root
|-- i: integer (nullable = true)
|-- decimal: decimal(30,0) (nullable = true)
```
### Type coercion table:
This PR proposes the type conflict resolusion as below:
**Before**
|InputA \
InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`StringType`|`IntegerType`|`LongType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`IntegerType`|`DoubleType`|`IntegerType`|`IntegerType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`LongType`|`DoubleType`|`LongType`|`LongType`|`StringType`|
|**`DecimalType(38,0)`**|`StringType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`DoubleType`|`StringType`|
|**`DateType`**|`StringType`|`IntegerType`|`LongType`|`DateType`|`DoubleType`|`DateType`|`DateType`|`StringType`|
|**`TimestampType`**|`StringType`|`IntegerType`|`LongType`|`TimestampType`|`DoubleType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|
**After**
|InputA \
InputB|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|------------------------|----------|----------|----------|----------|----------|----------|----------|----------|
|**`NullType`**|`NullType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`DateType`|`TimestampType`|`StringType`|
|**`IntegerType`**|`IntegerType`|`IntegerType`|`LongType`|`DecimalType(38,0)`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`LongType`**|`LongType`|`LongType`|`LongType`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DecimalType(38,0)`**|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`DecimalType(38,0)`|`StringType`|`StringType`|`StringType`|`StringType`|
|**`DoubleType`**|`DoubleType`|`DoubleType`|`StringType`|`StringType`|`DoubleType`|`StringType`|`StringType`|`StringType`|
|**`DateType`**|`DateType`|`StringType`|`StringType`|`StringType`|`StringType`|`DateType`|`TimestampType`|`StringType`|
|**`TimestampType`**|`TimestampType`|`StringType`|`StringType`|`StringType`|`StringType`|`TimestampType`|`TimestampType`|`StringType`|
|**`StringType`**|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|`StringType`|
This was produced by:
```scala
test("Print out chart") {
val supportedTypes: Seq[DataType] = Seq(
NullType, IntegerType, LongType, DecimalType(38, 0), DoubleType,
DateType, TimestampType, StringType)
// Old type conflict resolution:
val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
def oldResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
val topType = dataTypes.maxBy(upCastingOrder.indexOf(_))
if (topType == NullType) StringType else topType
}
println(s"|InputA \\ InputB|${supportedTypes.map(dt =>
s"`${dt.toString}`").mkString("|")}|")
println(s"|------------------------|${supportedTypes.map(_ =>
"----------").mkString("|")}|")
supportedTypes.foreach { inputA =>
val types = supportedTypes.map(inputB =>
oldResolveTypeConflicts(Seq(inputA, inputB)))
println(s"|**`$inputA`**|${types.map(dt =>
s"`${dt.toString}`").mkString("|")}|")
}
// New type conflict resolution:
def newResolveTypeConflicts(dataTypes: Seq[DataType]): DataType = {
dataTypes.fold[DataType](NullType)(findWiderTypeForPartitionColumn)
}
println(s"|InputA \\ InputB|${supportedTypes.map(dt =>
s"`${dt.toString}`").mkString("|")}|")
println(s"|------------------------|${supportedTypes.map(_ =>
"----------").mkString("|")}|")
supportedTypes.foreach { inputA =>
val types = supportedTypes.map(inputB =>
newResolveTypeConflicts(Seq(inputA, inputB)))
println(s"|**`$inputA`**|${types.map(dt =>
s"`${dt.toString}`").mkString("|")}|")
}
}
```
## How was this patch tested?
Unit tests added in `ParquetPartitionDiscoverySuite`.
Author: hyukjinkwon <[email protected]>
Closes #19389 from HyukjinKwon/partition-type-coercion.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6d7ebf2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6d7ebf2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6d7ebf2f
Branch: refs/heads/master
Commit: 6d7ebf2f9fbd043813738005a23c57a77eba6f47
Parents: 2d868d9
Author: hyukjinkwon <[email protected]>
Authored: Tue Nov 21 20:53:38 2017 +0100
Committer: Wenchen Fan <[email protected]>
Committed: Tue Nov 21 20:53:38 2017 +0100
----------------------------------------------------------------------
docs/sql-programming-guide.md | 139 +++++++++++++++++++
.../sql/catalyst/analysis/TypeCoercion.scala | 2 +-
.../datasources/PartitioningUtils.scala | 60 +++++---
.../ParquetPartitionDiscoverySuite.scala | 57 +++++++-
4 files changed, 235 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 686fcb1..5f98213 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1577,6 +1577,145 @@ options.
- Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when
the referenced columns only include the internal corrupt record column (named
`_corrupt_record` by default). For example,
`spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()`
and `spark.read.schema(schema).json(file).select("_corrupt_record").show()`.
Instead, you can cache or save the parsed results and then send the same query.
For example, `val df = spark.read.schema(schema).json(file).cache()` and then
`df.filter($"_corrupt_record".isNotNull).count()`.
- The `percentile_approx` function previously accepted numeric type input
and output double type results. Now it supports date type, timestamp type and
numeric types as input types. The result type is also changed to be the same as
the input type, which is more reasonable for percentiles.
+ - Partition column inference previously found incorrect common type for
different inferred types, for example, previously it ended up with double type
as the common type for double type and date type. Now it finds the correct
common type for such conflicts. The conflict resolution follows the table below:
+
+ <table class="table">
+ <tr>
+ <th>
+ <b>InputA \ InputB</b>
+ </th>
+ <th>
+ <b>NullType</b>
+ </th>
+ <th>
+ <b>IntegerType</b>
+ </th>
+ <th>
+ <b>LongType</b>
+ </th>
+ <th>
+ <b>DecimalType(38,0)*</b>
+ </th>
+ <th>
+ <b>DoubleType</b>
+ </th>
+ <th>
+ <b>DateType</b>
+ </th>
+ <th>
+ <b>TimestampType</b>
+ </th>
+ <th>
+ <b>StringType</b>
+ </th>
+ </tr>
+ <tr>
+ <td>
+ <b>NullType</b>
+ </td>
+ <td>NullType</td>
+ <td>IntegerType</td>
+ <td>LongType</td>
+ <td>DecimalType(38,0)</td>
+ <td>DoubleType</td>
+ <td>DateType</td>
+ <td>TimestampType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>IntegerType</b>
+ </td>
+ <td>IntegerType</td>
+ <td>IntegerType</td>
+ <td>LongType</td>
+ <td>DecimalType(38,0)</td>
+ <td>DoubleType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>LongType</b>
+ </td>
+ <td>LongType</td>
+ <td>LongType</td>
+ <td>LongType</td>
+ <td>DecimalType(38,0)</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>DecimalType(38,0)*</b>
+ </td>
+ <td>DecimalType(38,0)</td>
+ <td>DecimalType(38,0)</td>
+ <td>DecimalType(38,0)</td>
+ <td>DecimalType(38,0)</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>DoubleType</b>
+ </td>
+ <td>DoubleType</td>
+ <td>DoubleType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>DoubleType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>DateType</b>
+ </td>
+ <td>DateType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>DateType</td>
+ <td>TimestampType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>TimestampType</b>
+ </td>
+ <td>TimestampType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>TimestampType</td>
+ <td>TimestampType</td>
+ <td>StringType</td>
+ </tr>
+ <tr>
+ <td>
+ <b>StringType</b>
+ </td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ <td>StringType</td>
+ </tr>
+ </table>
+
+ Note that, for <b>DecimalType(38,0)*</b>, the table above intentionally
does not cover all other combinations of scales and precisions because
currently we only infer decimal type like `BigInteger`/`BigInt`. For example,
1.1 is inferred as double type.
## Upgrading From Spark SQL 2.1 to 2.2
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
----------------------------------------------------------------------
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index 074eda5..28be955 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -155,7 +155,7 @@ object TypeCoercion {
* i.e. the main difference with [[findTightestCommonType]] is that here we
allow some
* loss of precision when widening decimal and double, and promotion to
string.
*/
- private[analysis] def findWiderTypeForTwo(t1: DataType, t2: DataType):
Option[DataType] = {
+ def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = {
findTightestCommonType(t1, t2)
.orElse(findWiderTypeForDecimal(t1, t2))
.orElse(stringPromotion(t1, t2))
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 1c00c9e..472bf82 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.Resolver
+import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -309,13 +309,8 @@ object PartitioningUtils {
}
/**
- * Resolves possible type conflicts between partitions by up-casting "lower"
types. The up-
- * casting order is:
- * {{{
- * NullType ->
- * IntegerType -> LongType ->
- * DoubleType -> StringType
- * }}}
+ * Resolves possible type conflicts between partitions by up-casting "lower"
types using
+ * [[findWiderTypeForPartitionColumn]].
*/
def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)],
@@ -372,11 +367,31 @@ object PartitioningUtils {
suspiciousPaths.map("\t" + _).mkString("\n", "\n", "")
}
+ // scalastyle:off line.size.limit
/**
- * Converts a string to a [[Literal]] with automatic type inference.
Currently only supports
- * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType]],
[[DateType]]
+ * Converts a string to a [[Literal]] with automatic type inference.
Currently only supports
+ * [[NullType]], [[IntegerType]], [[LongType]], [[DoubleType]],
[[DecimalType]], [[DateType]]
* [[TimestampType]], and [[StringType]].
+ *
+ * When resolving conflicts, it follows the table below:
+ *
+ *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+ * | InputA \ InputB | NullType | IntegerType | LongType
| DecimalType(38,0)* | DoubleType | DateType | TimestampType |
StringType |
+ *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+ * | NullType | NullType | IntegerType | LongType
| DecimalType(38,0) | DoubleType | DateType | TimestampType |
StringType |
+ * | IntegerType | IntegerType | IntegerType | LongType
| DecimalType(38,0) | DoubleType | StringType | StringType |
StringType |
+ * | LongType | LongType | LongType | LongType
| DecimalType(38,0) | StringType | StringType | StringType |
StringType |
+ * | DecimalType(38,0)* | DecimalType(38,0) | DecimalType(38,0) |
DecimalType(38,0) | DecimalType(38,0) | StringType | StringType |
StringType | StringType |
+ * | DoubleType | DoubleType | DoubleType | StringType
| StringType | DoubleType | StringType | StringType |
StringType |
+ * | DateType | DateType | StringType | StringType
| StringType | StringType | DateType | TimestampType |
StringType |
+ * | TimestampType | TimestampType | StringType | StringType
| StringType | StringType | TimestampType | TimestampType |
StringType |
+ * | StringType | StringType | StringType | StringType
| StringType | StringType | StringType | StringType |
StringType |
+ *
+--------------------+-------------------+-------------------+-------------------+--------------------+------------+---------------+---------------+------------+
+ * Note that, for DecimalType(38,0)*, the table above intentionally does not
cover all other
+ * combinations of scales and precisions because currently we only infer
decimal type like
+ * `BigInteger`/`BigInt`. For example, 1.1 is inferred as double type.
*/
+ // scalastyle:on line.size.limit
private[datasources] def inferPartitionColumnValue(
raw: String,
typeInference: Boolean,
@@ -427,9 +442,6 @@ object PartitioningUtils {
}
}
- private val upCastingOrder: Seq[DataType] =
- Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
-
def validatePartitionColumn(
schema: StructType,
partitionColumns: Seq[String],
@@ -468,18 +480,26 @@ object PartitioningUtils {
}
/**
- * Given a collection of [[Literal]]s, resolves possible type conflicts by
up-casting "lower"
- * types.
+ * Given a collection of [[Literal]]s, resolves possible type conflicts by
+ * [[findWiderTypeForPartitionColumn]].
*/
private def resolveTypeConflicts(literals: Seq[Literal], timeZone:
TimeZone): Seq[Literal] = {
- val desiredType = {
- val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
- // Falls back to string if all values of this column are null or empty
string
- if (topType == NullType) StringType else topType
- }
+ val litTypes = literals.map(_.dataType)
+ val desiredType = litTypes.reduce(findWiderTypeForPartitionColumn)
literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType, Some(timeZone.getID)).eval(),
desiredType)
}
}
+
+ /**
+ * Type widening rule for partition column types. It is similar to
+ * [[TypeCoercion.findWiderTypeForTwo]] but the main difference is that here
we disallow
+ * precision loss when widening double/long and decimal, and fall back to
string.
+ */
+ private val findWiderTypeForPartitionColumn: (DataType, DataType) =>
DataType = {
+ case (DoubleType, _: DecimalType) | (_: DecimalType, DoubleType) =>
StringType
+ case (DoubleType, LongType) | (LongType, DoubleType) => StringType
+ case (t1, t2) => TypeCoercion.findWiderTypeForTwo(t1,
t2).getOrElse(StringType)
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/6d7ebf2f/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
index f79b92b..d490264 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala
@@ -249,6 +249,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest
with ParquetTest with Sha
true,
rootPaths,
timeZoneId)
+ assert(actualSpec.partitionColumns === spec.partitionColumns)
+ assert(actualSpec.partitions.length === spec.partitions.length)
+ actualSpec.partitions.zip(spec.partitions).foreach { case (actual,
expected) =>
+ assert(actual === expected)
+ }
assert(actualSpec === spec)
}
@@ -314,7 +319,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with
ParquetTest with Sha
PartitionSpec(
StructType(Seq(
StructField("a", DoubleType),
- StructField("b", StringType))),
+ StructField("b", NullType))),
Seq(
Partition(InternalRow(10, null),
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
Partition(InternalRow(10.5, null),
@@ -324,6 +329,32 @@ class ParquetPartitionDiscoverySuite extends QueryTest
with ParquetTest with Sha
s"hdfs://host:9000/path1",
s"hdfs://host:9000/path2"),
PartitionSpec.emptySpec)
+
+ // The cases below check the resolution for type conflicts.
+ val t1 = Timestamp.valueOf("2014-01-01 00:00:00.0").getTime * 1000
+ val t2 = Timestamp.valueOf("2014-01-01 00:01:00.0").getTime * 1000
+ // Values in column 'a' are inferred as null, date and timestamp each, and
timestamp is set
+ // as a common type.
+ // Values in column 'b' are inferred as integer, decimal(22, 0) and null,
and decimal(22, 0)
+ // is set as a common type.
+ check(Seq(
+ s"hdfs://host:9000/path/a=$defaultPartitionName/b=0",
+ s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111",
+ s"hdfs://host:9000/path/a=2014-01-01
00%3A01%3A00.0/b=$defaultPartitionName"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", TimestampType),
+ StructField("b", DecimalType(22, 0)))),
+ Seq(
+ Partition(
+ InternalRow(null, Decimal(0)),
+ s"hdfs://host:9000/path/a=$defaultPartitionName/b=0"),
+ Partition(
+ InternalRow(t1, Decimal(s"${Long.MaxValue}111")),
+ s"hdfs://host:9000/path/a=2014-01-01/b=${Long.MaxValue}111"),
+ Partition(
+ InternalRow(t2, null),
+ s"hdfs://host:9000/path/a=2014-01-01
00%3A01%3A00.0/b=$defaultPartitionName"))))
}
test("parse partitions with type inference disabled") {
@@ -395,7 +426,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with
ParquetTest with Sha
PartitionSpec(
StructType(Seq(
StructField("a", StringType),
- StructField("b", StringType))),
+ StructField("b", NullType))),
Seq(
Partition(InternalRow(UTF8String.fromString("10"), null),
s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
@@ -1067,4 +1098,26 @@ class ParquetPartitionDiscoverySuite extends QueryTest
with ParquetTest with Sha
checkAnswer(spark.read.load(path.getAbsolutePath), df)
}
}
+
+ test("Resolve type conflicts - decimals, dates and timestamps in partition
column") {
+ withTempPath { path =>
+ val df = Seq((1, "2014-01-01"), (2, "2016-01-01"), (3, "2015-01-01
00:01:00")).toDF("i", "ts")
+ df.write.format("parquet").partitionBy("ts").save(path.getAbsolutePath)
+ checkAnswer(
+ spark.read.load(path.getAbsolutePath),
+ Row(1, Timestamp.valueOf("2014-01-01 00:00:00")) ::
+ Row(2, Timestamp.valueOf("2016-01-01 00:00:00")) ::
+ Row(3, Timestamp.valueOf("2015-01-01 00:01:00")) :: Nil)
+ }
+
+ withTempPath { path =>
+ val df = Seq((1, "1"), (2, "3"), (3, "2" * 30)).toDF("i", "decimal")
+
df.write.format("parquet").partitionBy("decimal").save(path.getAbsolutePath)
+ checkAnswer(
+ spark.read.load(path.getAbsolutePath),
+ Row(1, BigDecimal("1")) ::
+ Row(2, BigDecimal("3")) ::
+ Row(3, BigDecimal("2" * 30)) :: Nil)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]