This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 323679f  [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading 
parquet non-decimal fields as decimal
323679f is described below

commit 323679f75250d279110f9586bc7758a12b0b68bd
Author: Wenchen Fan <[email protected]>
AuthorDate: Wed Jan 27 09:34:31 2021 -0800

    [SPARK-34212][SQL][FOLLOWUP] Refine the behavior of reading parquet 
non-decimal fields as decimal
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/31319 .
    
    When reading parquet int/long as decimal, the behavior should be the same 
as reading int/long and then cast to the decimal type. This PR changes to the 
expected behavior.
    
    When reading parquet binary as decimal, we don't really know how to 
interpret the binary (it may from a string), and should fail. This PR changes 
to the expected behavior.
    
    ### Why are the changes needed?
    
    To make the behavior more sane.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but it's a followup.
    
    ### How was this patch tested?
    
    updated test
    
    Closes #31357 from cloud-fan/bug.
    
    Authored-by: Wenchen Fan <[email protected]>
    Signed-off-by: Dongjoon Hyun <[email protected]>
    (cherry picked from commit 2dbb7d5af8f498e49488cd8876bd3d0b083723b7)
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 .../datasources/parquet/ParquetRowConverter.scala  | 48 +++++++++++-------
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 58 ++++++++++++----------
 2 files changed, 60 insertions(+), 46 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 5005d41..151bb13 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -234,19 +234,6 @@ private[parquet] class ParquetRowConverter(
   }
 
   /**
-   * Get a precision and a scale to interpret parquet decimal values.
-   * 1. If there is a decimal metadata, we read decimal values with the given 
precision and scale.
-   * 2. If there is no metadata, we read decimal values with scale `0` because 
it's plain integers
-   *    when it is written into INT32/INT64/BINARY/FIXED_LEN_BYTE_ARRAY types.
-   */
-  private def getPrecisionAndScale(parquetType: Type, t: DecimalType): (Int, 
Int) = {
-    val metadata = parquetType.asPrimitiveType().getDecimalMetadata
-    val precision = if (metadata == null) t.precision else 
metadata.getPrecision()
-    val scale = if (metadata == null) 0 else metadata.getScale()
-    (precision, scale)
-  }
-
-  /**
    * Creates a converter for the given Parquet type `parquetType` and Spark 
SQL data type
    * `catalystType`. Converted values are handled by `updater`.
    */
@@ -273,20 +260,43 @@ private[parquet] class ParquetRowConverter(
 
       // For INT32 backed decimals
       case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetIntDictionaryAwareDecimalConverter(precision, scale, 
updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          // If the column is a plain INT32, we should pick the precision that 
can host the largest
+          // INT32 value.
+          new ParquetIntDictionaryAwareDecimalConverter(
+            DecimalType.IntDecimal.precision, 0, updater)
+        } else {
+          new ParquetIntDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       // For INT64 backed decimals
       case t: DecimalType if 
parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetLongDictionaryAwareDecimalConverter(precision, scale, 
updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          // If the column is a plain INT64, we should pick the precision that 
can host the largest
+          // INT64 value.
+          new ParquetLongDictionaryAwareDecimalConverter(
+            DecimalType.LongDecimal.precision, 0, updater)
+        } else {
+          new ParquetLongDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
       case t: DecimalType
         if parquetType.asPrimitiveType().getPrimitiveTypeName == 
FIXED_LEN_BYTE_ARRAY ||
            parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
-        val (precision, scale) = getPrecisionAndScale(parquetType, t)
-        new ParquetBinaryDictionaryAwareDecimalConverter(precision, scale, 
updater)
+        val metadata = parquetType.asPrimitiveType().getDecimalMetadata
+        if (metadata == null) {
+          throw new RuntimeException(s"Unable to create Parquet converter for 
${t.typeName} " +
+            s"whose Parquet type is $parquetType without decimal metadata. 
Please read this " +
+            "column/field as Spark BINARY type." )
+        } else {
+          new ParquetBinaryDictionaryAwareDecimalConverter(
+            metadata.getPrecision, metadata.getScale, updater)
+        }
 
       case t: DecimalType =>
         throw new RuntimeException(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c8e02c7..ed6cdb5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -3578,52 +3578,56 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
   }
 
   test("SPARK-34212 Parquet should read decimals correctly") {
-    // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is 
binary-decimal (16 bytes)
-    val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS 
DECIMAL(36, 2)) c")
+    def readParquet(schema: String, path: File): DataFrame = {
+      spark.read.schema(schema).parquet(path.toString)
+    }
 
     withTempPath { path =>
+      // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is 
binary-decimal (16 bytes)
+      val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS 
DECIMAL(36, 2)) c")
       df.write.parquet(path.toString)
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
         val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)"
-        checkAnswer(spark.read.schema(schema1).parquet(path.toString), df)
+        checkAnswer(readParquet(schema1, path), df)
         val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)"
-        checkAnswer(spark.read.schema(schema2).parquet(path.toString), Row(1, 
1.2, 1.2))
+        checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2))
       }
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        val e1 = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e1.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
-        val e2 = intercept[SparkException] {
-          spark.read.schema("b DECIMAL(18, 
1)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e2.isInstanceOf[SchemaColumnConvertNotSupportedException])
-
-        val e3 = intercept[SparkException] {
-          spark.read.schema("c DECIMAL(37, 
1)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e3.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach 
{ schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
       }
     }
 
+    // tests for parquet types without decimal metadata.
     withTempPath { path =>
-      val df2 = sql(s"SELECT 1 a, ${Int.MaxValue + 1L} b, CAST(2 AS BINARY) c")
-      df2.write.parquet(path.toString)
+      val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, 
CAST('1.2' AS BINARY) d")
+      df.write.parquet(path.toString)
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
-        val schema = "a DECIMAL(3, 2), b DECIMAL(17, 2), c DECIMAL(36, 2)"
-        checkAnswer(spark.read.schema(schema).parquet(path.toString),
-          Row(BigDecimal(100, 2), BigDecimal((Int.MaxValue + 1L) * 100, 2), 
BigDecimal(200, 2)))
+        checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00"))
+        checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null))
+        checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 
123456.0"))
+        checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null))
+        checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c"))
+        val e = intercept[SparkException] {
+          readParquet("d DECIMAL(3, 2)", path).collect()
+        }.getCause
+        assert(e.getMessage.contains("Please read this column/field as Spark 
BINARY type"))
       }
 
       withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") {
-        val e = intercept[SparkException] {
-          spark.read.schema("a DECIMAL(3, 2)").parquet(path.toString).collect()
-        }.getCause.getCause
-        assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach 
{ schema =>
+          val e = intercept[SparkException] {
+            readParquet(schema, path).collect()
+          }.getCause.getCause
+          assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException])
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to