Franklyn Dsouza created SPARK-19299:
---------------------------------------
Summary: Nulls in non nullable columns causes data corruption in
parquet
Key: SPARK-19299
URL: https://issues.apache.org/jira/browse/SPARK-19299
Project: Spark
Issue Type: Bug
Components: PySpark, Spark Core
Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.0
Reporter: Franklyn Dsouza
The problem we're seeing is that if a null occurs in a no-nullable field and is
written down to parquet the resulting file gets corrupt and can not be read
back correctly.
One way that this can occur is when a long value in python is too big to fit
into a spark LongType it gets cast to null.
We're also seeing that the behaviour is slightly different depending on whether
or not the vectorized reader is enabled.
Here's an example in PySpark
{code}
from datetime import datetime
from pyspark.sql import types
data = [
(1, 6),
(2, 7),
(3, 2 ** 64),
(4, 8),
(5, 9)
]
schema = types.StructType([
types.StructField("index", types.LongType(), False),
types.StructField("long", types.LongType(), False),
])
df = sc.sql.createDataFrame(data, schema)
df.collect()
df.write.parquet("corrupt_parquet")
df_parquet = sqlCtx.read.parquet("corrupt_parquet/*.parquet")
df_parquet.collect()
{code}
with the vectorized reader on this causes
{code}
In [2]: df.collect()
Out[2]:
[Row(index=1, long=6),
Row(index=2, long=7),
Row(index=3, long=None),
Row(index=4, long=8),
Row(index=5, long=9)]
In [3]: df_parquet.collect()
Out[3]:
[Row(index=1, long=6),
Row(index=2, long=7),
Row(index=3, long=8),
Row(index=4, long=9),
Row(index=5, long=5)]
{code}
as you can see reading the data back from disk causes data to get shifted up
and between columns.
with vectorized reader off we are completely unable to read the file.
{code}
Py4JJavaError: An error occurred while calling o143.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in
stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID
3, localhost): org.apache.parquet.io.ParquetDecodingException: Can not read
value at 4 in block 0 in file
file:/Users/franklyndsouza/dev/starscream/corrupt/part-r-00000-4fa5aee8-2138-4e0c-b6d8-22a418d90fd3.snappy.parquet
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:228)
at
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:91)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.parquet.io.ParquetDecodingException: Can't read value in
column [long] INT64 at value 5 out of 5, 5 out of 5 in currentPage. repetition
level: 0, definition level: 0
at
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:462)
at
org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:364)
at
org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:405)
at
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:209)
... 19 more
Caused by: org.apache.parquet.io.ParquetDecodingException: could not read long
at
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:131)
at
org.apache.parquet.column.impl.ColumnReaderImpl$2$4.read(ColumnReaderImpl.java:258)
at
org.apache.parquet.column.impl.ColumnReaderImpl.readValue(ColumnReaderImpl.java:458)
... 22 more
Caused by: java.io.EOFException
at
org.apache.parquet.bytes.LittleEndianDataInputStream.readFully(LittleEndianDataInputStream.java:90)
at
org.apache.parquet.bytes.LittleEndianDataInputStream.readLong(LittleEndianDataInputStream.java:377)
at
org.apache.parquet.column.values.plain.PlainValuesReader$LongPlainValuesReader.readLong(PlainValuesReader.java:129)
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]