Matt Cheah created SPARK-26874:
----------------------------------

             Summary: When we upgrade Parquet to 1.11+, Spark can erroneously 
write empty pages
                 Key: SPARK-26874
                 URL: https://issues.apache.org/jira/browse/SPARK-26874
             Project: Spark
          Issue Type: Bug
          Components: Input/Output, SQL
    Affects Versions: 2.4.0
            Reporter: Matt Cheah


This issue will only come up when Spark upgrades its Parquet dependency to the 
latest. This issue is being filed to proactively fix the bug before we upgrade 
- it's not something that would easily be found in the current unit tests and 
can be missed until the community scale tests in an e.g. RC phase.

Parquet introduced a new feature to limit the number of rows written to a page 
in a column chunk - see PARQUET-1414. Previously, Parquet would only flush 
pages to the column store after the page writer had filled its buffer with a 
certain amount of bytes. The idea of the Parquet patch was to make page writers 
flush to the column store upon the writer being given a certain number of rows 
- the default value is 20000.

The patch makes the Spark Parquet Data Source erroneously write empty pages to 
column chunks, making the Parquet file ultimately unreadable with exceptions 
like these:

 
{code:java}
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value 
at 0 in block -1 in file 
file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
 at 
org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
 at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
 at 
org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
 ... 18 more
Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking 
stream.
 at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)
 at 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
 at 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
 at 
org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53)
 at 
org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733)
 at 
org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567)
 at 
org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705)
 at 
org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30)
 at 
org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47)
 at 
org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84)
 at 
org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
 at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
 at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
 at 
org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
 at 
org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
 at 
org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
 ... 22 more
{code}
What's happening here is that the reader is being given a page with no values, 
which Parquet can never handle.

The root cause is due to the way Spark treats empty (null) records in optional 
fields. Concretely, in {{ParquetWriteSupport#write}}, we always indicate to the 
recordConsumer that we are starting a message 
({{recordConsumer#startMessage}}). If Spark is given a null field in the row, 
it still indicates to the record consumer after having written the row that the 
message is finished ({{recordConsumer#endMessage}}). The ending of the message 
causes all column writers to increment their row count in the current page by 
1, despite the fact that Spark is not necessarily sending records to the 
underlying page writer. Now suppose the page maximum row count is N; if Spark 
does the above N times in a page, and particularly if Spark cuts a page 
boundary and is subsequently given N empty values for an optional column - the 
column writer will then think it needs to flush the page to the column chunk 
store and will write out an empty page.

This will most likely be manifested in very sparse columns.

A simple demonstration of the issue is given below. Assume this code is 
manually inserted into \{{ParquetIOSuite}}:
{code:java}
test("PARQUET-1414 Problems") {
  // Manually adjust the maximum row count to reproduce the issue on small data
  sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
  withTempPath { location =>
    val path = new Path(location.getCanonicalPath + "/parquet-data")
    val schema = StructType(
      Array(StructField("timestamps1", ArrayType(TimestampType))))
    val rows = ListBuffer[Row]()
    for (j <- 0 until 10) {
      rows += Row(
        null.asInstanceOf[Array[java.sql.Timestamp]])
    }
    val srcDf = spark.createDataFrame(
      sparkContext.parallelize(rows, 3),
      schema,
      true)
    srcDf.write.parquet(path.toString)
    assert(spark.read.parquet(path.toString).collect.size > 0)
  }
}{code}
Reverting the Parquet patch makes the above test pass.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to