[ 
https://issues.apache.org/jira/browse/SPARK-26874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matt Cheah resolved SPARK-26874.
--------------------------------
    Resolution: Not A Problem

I did some more digging with [~rdblue] and we discovered that this is a root 
problem on the Parquet side, and particularly in a feature that hasn't been 
released yet. We'll continue pursuing solutions there.

> With PARQUET-1414, Spark can erroneously write empty pages
> ----------------------------------------------------------
>
>                 Key: SPARK-26874
>                 URL: https://issues.apache.org/jira/browse/SPARK-26874
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output, SQL
>    Affects Versions: 2.4.0
>            Reporter: Matt Cheah
>            Priority: Major
>
> This issue will only come up when Spark upgrades its Parquet dependency to 
> the latest off of parquet-mr/master. This issue is being filed to proactively 
> fix the bug before we upgrade - it's not something that would easily be found 
> in the current unit tests and can be missed until the community scale tests 
> in an e.g. RC phase.
> Parquet introduced a new feature to limit the number of rows written to a 
> page in a column chunk - see PARQUET-1414. Previously, Parquet would only 
> flush pages to the column store after the page writer had filled its buffer 
> with a certain amount of bytes. The idea of the Parquet patch was to make 
> page writers flush to the column store upon the writer being given a certain 
> number of rows - the default value is 20000.
> The patch makes the Spark Parquet Data Source erroneously write empty pages 
> to column chunks, making the Parquet file ultimately unreadable with 
> exceptions like these:
>  
> {code:java}
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
> at 0 in block -1 in file 
> file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
>  at 
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
>  at 
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
>  at 
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
>  ... 18 more
> Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking 
> stream.
>  at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)
>  at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
>  at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
>  at 
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53)
>  at 
> org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733)
>  at 
> org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567)
>  at 
> org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705)
>  at 
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30)
>  at 
> org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47)
>  at 
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84)
>  at 
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
>  at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
>  at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
>  at 
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
>  at 
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
>  at 
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
>  ... 22 more
> {code}
> What's happening here is that the reader is being given a page with no 
> values, which Parquet can never handle.
> The root cause is due to the way Spark treats empty (null) records in 
> optional fields. Concretely, in {{ParquetWriteSupport#write}}, we always 
> indicate to the recordConsumer that we are starting a message 
> ({{recordConsumer#startMessage}}). If Spark is given a null field in the row, 
> it still indicates to the record consumer after having ignored the row that 
> the message is finished ({{recordConsumer#endMessage}}). The ending of the 
> message causes all column writers to increment their row count in the current 
> page by 1, despite the fact that Spark is not necessarily sending records to 
> the underlying page writer. Now suppose the page maximum row count is N; if 
> Spark does the above N times in a page, and particularly if Spark cuts a page 
> boundary and is subsequently given N empty values for an optional column - 
> the column writer will then think it needs to flush the page to the column 
> chunk store and will write out an empty page.
> This will most likely be manifested in very sparse columns.
> A simple demonstration of the issue is given below. Assume this code is 
> manually inserted into {{ParquetIOSuite}}:
> {code:java}
> test("PARQUET-1414 Problems") {
>   // Manually adjust the maximum row count to reproduce the issue on small 
> data
>   sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
>   withTempPath { location =>
>     val path = new Path(location.getCanonicalPath + "/parquet-data")
>     val schema = StructType(
>       Array(StructField("timestamps1", ArrayType(TimestampType))))
>     val rows = ListBuffer[Row]()
>     for (j <- 0 until 10) {
>       rows += Row(
>         null.asInstanceOf[Array[java.sql.Timestamp]])
>     }
>     val srcDf = spark.createDataFrame(
>       sparkContext.parallelize(rows, 3),
>       schema,
>       true)
>     srcDf.write.parquet(path.toString)
>     assert(spark.read.parquet(path.toString).collect.size > 0)
>   }
> }{code}
> Reverting the Parquet patch makes the above test pass.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to