This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 6bc088f [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading
parquet non-decimal fields as decimal
6bc088f is described below
commit 6bc088fd0499a28201dc6c2a25836d02d769e14d
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jan 27 09:34:31 2021 -0800
[SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet
non-decimal fields as decimal
This is a followup of https://github.com/apache/spark/pull/31319 .
When reading parquet int/long as decimal, the behavior should be the same
as reading int/long and then cast to the decimal type. This PR changes to the
expected behavior.
When reading parquet binary as decimal, we don't really know how to
interpret the binary (it may from a string), and should fail. This PR changes
to the expected behavior.
To make the behavior more sane.
Yes, but it's a followup.
updated test
Closes #31357 from cloud-fan/bug.
Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7)
Signed-off-by: Dongjoon Hyun <[email protected]>
---
.../datasources/parquet/ParquetRowConverter.scala | 48 +++++++++++-------
.../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 ++++++++++++----------
2 files changed, 60 insertions(+), 46 deletions(-)
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 0d22fe5..5878bb0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -210,19 +210,6 @@ private[parquet] class ParquetRowConverter(
}
/**
- * Get a precision and a scale to interpret parquet decimal values.
- * 1. If there is a decimal metadata, we read decimal values with the given
precision and scale.
- * 2. If there is no metadata, we read decimal values with scale `0` because
it's plain integers
- * when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types.
- */
- private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int,
Int) = {
- val metadata = parquetType.asPrimitiveType().getDecimalMetadata
- val precision = if (metadata == null) t.precision else
metadata.getPrecision()
- val scale = if (metadata == null) 0 else metadata.getScale()
- (precision, scale)
- }
-
- /**
* Creates a converter for the given Parquet type `parquetType` and Spark
SQL data type
* `catalystType`. Converted values are handled by `updater`.
*/
@@ -249,20 +236,43 @@ private[parquet] class ParquetRowConverter(
// For INT32 backed decimals
case t: DecimalType if
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
- val (precision, scale) = getPrecisionAndScale(parquetType, t)
- new ParquetIntDictionaryAwareDecimalConverter(precision, scale,
updater)
+ val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+ if (metadata == null) {
+ // If the column is a plain INT32, we should pick the precision that
can host the largest
+ // INT32 value.
+ new ParquetIntDictionaryAwareDecimalConverter(
+ DecimalType.IntDecimal.precision, 0, updater)
+ } else {
+ new ParquetIntDictionaryAwareDecimalConverter(
+ metadata.getPrecision, metadata.getScale, updater)
+ }
// For INT64 backed decimals
case t: DecimalType if
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
- val (precision, scale) = getPrecisionAndScale(parquetType, t)
- new ParquetLongDictionaryAwareDecimalConverter(precision, scale,
updater)
+ val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+ if (metadata == null) {
+ // If the column is a plain INT64, we should pick the precision that
can host the largest
+ // INT64 value.
+ new ParquetLongDictionaryAwareDecimalConverter(
+ DecimalType.LongDecimal.precision, 0, updater)
+ } else {
+ new ParquetLongDictionaryAwareDecimalConverter(
+ metadata.getPrecision, metadata.getScale, updater)
+ }
// For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
case t: DecimalType
if parquetType.asPrimitiveType().getPrimitiveTypeName ==
FIXED_LEN_BYTE_ARRAY ||
parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
- val (precision, scale) = getPrecisionAndScale(parquetType, t)
- new ParquetBinaryDictionaryAwareDecimalConverter(precision, scale,
updater)
+ val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+ if (metadata == null) {
+ throw new RuntimeException(s"Unable to create Parquet converter for
${t.typeName} " +
+ s"whose Parquet type is $parquetType without decimal metadata.
Please read this " +
+ "column/field as Spark BINARY type." )
+ } else {
+ new ParquetBinaryDictionaryAwareDecimalConverter(
+ metadata.getPrecision, metadata.getScale, updater)
+ }
case t: DecimalType =>
throw new RuntimeException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 1af50bf..a2efed6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3143,52 +3143,56 @@ class SQLQuerySuite extends QueryTest with
SharedSQLContext {
}
test("SPARK-34212 Parquet should read decimals correctly") {
- // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is
binary-decimal (16 bytes)
- val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS
DECIMAL(36, 2)) c")
+ def readParquet(schema: String, path: File): DataFrame = {
+ spark.read.schema(schema).parquet(path.toString)
+ }
withTempPath { path =>
+ // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is
binary-decimal (16 bytes)
+ val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS
DECIMAL(36, 2)) c")
df.write.parquet(path.toString)
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
- checkAnswer(spark.read.schema(schema1).parquet(path.toString), df)
+ checkAnswer(readParquet(schema1, path), df)
val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
- checkAnswer(spark.read.schema(schema2).parquet(path.toString), Row(1,
1.2, 1.2))
+ checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
- val e1 = intercept[SparkException] {
- spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
- }.getCause.getCause
- assert(e1.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
- val e2 = intercept[SparkException] {
- spark.read.schema("b DECIMAL(18,
1)").parquet(path.toString).collect()
- }.getCause.getCause
- assert(e2.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
- val e3 = intercept[SparkException] {
- spark.read.schema("c DECIMAL(37,
1)").parquet(path.toString).collect()
- }.getCause.getCause
- assert(e3.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach
{ schema =>
+ val e = intercept[SparkException] {
+ readParquet(schema, path).collect()
+ }.getCause.getCause
+ assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
}
}
+ // tests for parquet types without decimal metadata.
withTempPath { path =>
- val df2 = sql(s"SELECT 1 a, ${Int.MaxValue + 1L} b")
- df2.write.parquet(path.toString)
+ val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c,
CAST('1.2' AS BINARY) d")
+ df.write.parquet(path.toString)
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
- val schema = "a DECIMAL(3, 2), b DECIMAL(17, 2)"
- checkAnswer(spark.read.schema(schema).parquet(path.toString),
- Row(BigDecimal(100, 2), BigDecimal((Int.MaxValue + 1L) * 100, 2)))
+ checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+ checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+ checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT
123456.0"))
+ checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+ checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+ val e = intercept[SparkException] {
+ readParquet("d DECIMAL(3, 2)", path).collect()
+ }.getCause
+ assert(e.getMessage.contains("Please read this column/field as Spark
BINARY type"))
}
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
- val e = intercept[SparkException] {
- spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
- }.getCause.getCause
- assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach
{ schema =>
+ val e = intercept[SparkException] {
+ readParquet(schema, path).collect()
+ }.getCause.getCause
+ assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]