This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new d5e001b [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite d5e001b is described below commit d5e001bf3e386fba97926866f5865b4d801663b2 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Tue Apr 13 09:04:47 2021 +0000 [SPARK-34212][SQL][FOLLOWUP] Move the added test to ParquetQuerySuite This pr moves the added test from `SQLQuerySuite` to `ParquetQuerySuite`. 1. It can be tested by `ParquetV1QuerySuite` and `ParquetV2QuerySuite`. 2. Reduce the testing time of `SQLQuerySuite`(SQLQuerySuite ~ 3 min 17 sec, ParquetV1QuerySuite ~ 27 sec). No. Unit test. Closes #32090 from wangyum/SPARK-34212. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 64 ---------------------- .../datasources/parquet/ParquetQuerySuite.scala | 63 ++++++++++++++++++++- 2 files changed, 62 insertions(+), 65 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index abb0650..8280371 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.FunctionsCommand -import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -3589,69 +3588,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark }) } - test("SPARK-34212 Parquet should read decimals correctly") { - def readParquet(schema: String, path: File): DataFrame = { - spark.read.schema(schema).parquet(path.toString) - } - - withTempPath { path => - // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes) - val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c") - df.write.parquet(path.toString) - - Seq(true, false).foreach { vectorizedReader => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReader.toString) { - // We can read the decimal parquet field with a larger precision, if scale is the same. - val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)" - checkAnswer(readParquet(schema, path), df) - } - } - - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" - checkAnswer(readParquet(schema1, path), df) - val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" - checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2)) - } - - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { - Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema => - val e = intercept[SparkException] { - readParquet(schema, path).collect() - }.getCause.getCause - assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException]) - } - } - } - - // tests for parquet types without decimal metadata. - withTempPath { path => - val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") - df.write.parquet(path.toString) - - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { - checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) - checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) - checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) - checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null)) - checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c")) - val e = intercept[SparkException] { - readParquet("d DECIMAL(3, 2)", path).collect() - }.getCause - assert(e.getMessage.contains("Please read this column/field as Spark BINARY type")) - } - - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { - Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema => - val e = intercept[SparkException] { - readParquet(schema, path).collect() - }.getCause.getCause - assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException]) - } - } - } - } - test("SPARK-34421: Resolve temporary objects in permanent views with CTEs") { val tempFuncName = "temp_func" withUserDefinedFunction((tempFuncName, true)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 05d305a9b..358d06c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol +import org.apache.spark.sql.execution.datasources.{SchemaColumnConvertNotSupportedException, SQLHadoopMapReduceCommitProtocol} import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT, SingleElement} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -840,6 +840,67 @@ abstract class ParquetQuerySuite extends QueryTest with ParquetTest with SharedS testMigration(fromTsType = "INT96", toTsType = "TIMESTAMP_MICROS") testMigration(fromTsType = "TIMESTAMP_MICROS", toTsType = "INT96") } + + test("SPARK-34212 Parquet should read decimals correctly") { + def readParquet(schema: String, path: File): DataFrame = { + spark.read.schema(schema).parquet(path.toString) + } + + withTempPath { path => + // a is int-decimal (4 bytes), b is long-decimal (8 bytes), c is binary-decimal (16 bytes) + val df = sql("SELECT 1.0 a, CAST(1.23 AS DECIMAL(17, 2)) b, CAST(1.23 AS DECIMAL(36, 2)) c") + df.write.parquet(path.toString) + + withAllParquetReaders { + // We can read the decimal parquet field with a larger precision, if scale is the same. + val schema = "a DECIMAL(9, 1), b DECIMAL(18, 2), c DECIMAL(38, 2)" + checkAnswer(readParquet(schema, path), df) + } + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + val schema1 = "a DECIMAL(3, 2), b DECIMAL(18, 3), c DECIMAL(37, 3)" + checkAnswer(readParquet(schema1, path), df) + val schema2 = "a DECIMAL(3, 0), b DECIMAL(18, 1), c DECIMAL(37, 1)" + checkAnswer(readParquet(schema2, path), Row(1, 1.2, 1.2)) + } + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + Seq("a DECIMAL(3, 2)", "b DECIMAL(18, 1)", "c DECIMAL(37, 1)").foreach { schema => + val e = intercept[SparkException] { + readParquet(schema, path).collect() + }.getCause.getCause + assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException]) + } + } + } + + // tests for parquet types without decimal metadata. + withTempPath { path => + val df = sql(s"SELECT 1 a, 123456 b, ${Int.MaxValue.toLong * 10} c, CAST('1.2' AS BINARY) d") + df.write.parquet(path.toString) + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + checkAnswer(readParquet("a DECIMAL(3, 2)", path), sql("SELECT 1.00")) + checkAnswer(readParquet("b DECIMAL(3, 2)", path), Row(null)) + checkAnswer(readParquet("b DECIMAL(11, 1)", path), sql("SELECT 123456.0")) + checkAnswer(readParquet("c DECIMAL(11, 1)", path), Row(null)) + checkAnswer(readParquet("c DECIMAL(13, 0)", path), df.select("c")) + val e = intercept[SparkException] { + readParquet("d DECIMAL(3, 2)", path).collect() + }.getCause + assert(e.getMessage.contains("Please read this column/field as Spark BINARY type")) + } + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + Seq("a DECIMAL(3, 2)", "c DECIMAL(18, 1)", "d DECIMAL(37, 1)").foreach { schema => + val e = intercept[SparkException] { + readParquet(schema, path).collect() + }.getCause.getCause + assert(e.isInstanceOf[SchemaColumnConvertNotSupportedException]) + } + } + } + } } class ParquetV1QuerySuite extends ParquetQuerySuite { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org