[
https://issues.apache.org/jira/browse/SPARK-46056?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Cosmin Dumitru updated SPARK-46056:
-----------------------------------
Description:
The scenario is a bit more complicated than what the title says but it's not
that far fetched.
# Write a parquet file with one column
# Evolve the schema and add a new column with DecimalType wide enough that it
doesn't fit in a long and has a default value.
# Try to read the file with the new schema
# NPE
The issue lies in how the column vector stores DecimalTypes. It incorrectly
assumes that they fit in a long and try to write it to associated long array.
[https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java#L724]
In OnHeapColumnVector which extends WritableColumVector reserveInternal()
checks if the type is too wide and initializes the array elements.
[https://github.com/apache/spark/blob/b568ba43f0dd80130bca1bf86c48d0d359e57883/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L568]
isArray() returns true if the type is byteArrayDecimalType
[https://github.com/apache/spark/blob/afebf8e6c9f24d264580d084cb12e3e6af120a5a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java#L945]
Without the fix
{code:java}
[info] Cause: java.lang.NullPointerException:
[info] at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLongs(OnHeapColumnVector.java:370)
[info] at
org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendLongs(WritableColumnVector.java:611)
[info] at
org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendObjects(WritableColumnVector.java:745)
[info] at
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:95)
[info] at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
[info] at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
[info] at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:218)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:280)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
[info] at
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:614)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
[info] at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
[info] at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
[info] at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891){code}
fix PR [https://github.com/apache/spark/pull/43960]
was:
The scenario is a bit more complicated than what the title says but it's not
that far fetched.
# Write a parquet file with one column
# Evolve the schema and add a new column with DecimalType wide enough that it
doesn't fit in a long and has a default value.
# Try to read the file with the new schema
# NPE
The issue lies in how the column vector stores DecimalTypes. It incorrectly
assumes that they fit in a long and try to write it to associated long array.
[https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java#L724]
Without the fix
{code:java}
[info] Cause: java.lang.NullPointerException:
[info] at
org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLongs(OnHeapColumnVector.java:370)
[info] at
org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendLongs(WritableColumnVector.java:611)
[info] at
org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendObjects(WritableColumnVector.java:745)
[info] at
org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:95)
[info] at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
[info] at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
[info] at
org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:218)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:280)
[info] at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
[info] at
org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:614)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
[info] at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
[info] at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
[info] at
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
[info] at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
[info] at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891){code}
fix PR https://github.com/apache/spark/pull/43960
> Vectorized parquet reader throws NPE when reading files with DecimalType
> default values
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-46056
> URL: https://issues.apache.org/jira/browse/SPARK-46056
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.4.0, 3.5.0
> Reporter: Cosmin Dumitru
> Priority: Major
>
> The scenario is a bit more complicated than what the title says but it's not
> that far fetched.
> # Write a parquet file with one column
> # Evolve the schema and add a new column with DecimalType wide enough that
> it doesn't fit in a long and has a default value.
> # Try to read the file with the new schema
> # NPE
> The issue lies in how the column vector stores DecimalTypes. It incorrectly
> assumes that they fit in a long and try to write it to associated long array.
> [https://github.com/apache/spark/blob/master/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java#L724]
>
> In OnHeapColumnVector which extends WritableColumVector reserveInternal()
> checks if the type is too wide and initializes the array elements.
> [https://github.com/apache/spark/blob/b568ba43f0dd80130bca1bf86c48d0d359e57883/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java#L568]
> isArray() returns true if the type is byteArrayDecimalType
> [https://github.com/apache/spark/blob/afebf8e6c9f24d264580d084cb12e3e6af120a5a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java#L945]
>
> Without the fix
> {code:java}
> [info] Cause: java.lang.NullPointerException:
> [info] at
> org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putLongs(OnHeapColumnVector.java:370)
> [info] at
> org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendLongs(WritableColumnVector.java:611)
> [info] at
> org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendObjects(WritableColumnVector.java:745)
> [info] at
> org.apache.spark.sql.execution.datasources.parquet.ParquetColumnVector.<init>(ParquetColumnVector.java:95)
> [info] at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:286)
> [info] at
> org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initBatch(VectorizedParquetRecordReader.java:306)
> [info] at
> org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.$anonfun$buildReaderWithPartitionValues$2(ParquetFileFormat.scala:293)
> [info] at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:218)
> [info] at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:280)
> [info] at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
> [info] at
> org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:614)
> [info] at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
> Source)
> [info] at
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
> Source)
> [info] at
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> [info] at
> org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
> [info] at
> org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
> [info] at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:891){code}
> fix PR [https://github.com/apache/spark/pull/43960]
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]