[
https://issues.apache.org/jira/browse/SPARK-26874?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matt Cheah resolved SPARK-26874.
--------------------------------
Resolution: Not A Problem
I did some more digging with [~rdblue] and we discovered that this is a root
problem on the Parquet side, and particularly in a feature that hasn't been
released yet. We'll continue pursuing solutions there.
> With PARQUET-1414, Spark can erroneously write empty pages
> ----------------------------------------------------------
>
> Key: SPARK-26874
> URL: https://issues.apache.org/jira/browse/SPARK-26874
> Project: Spark
> Issue Type: Bug
> Components: Input/Output, SQL
> Affects Versions: 2.4.0
> Reporter: Matt Cheah
> Priority: Major
>
> This issue will only come up when Spark upgrades its Parquet dependency to
> the latest off of parquet-mr/master. This issue is being filed to proactively
> fix the bug before we upgrade - it's not something that would easily be found
> in the current unit tests and can be missed until the community scale tests
> in an e.g. RC phase.
> Parquet introduced a new feature to limit the number of rows written to a
> page in a column chunk - see PARQUET-1414. Previously, Parquet would only
> flush pages to the column store after the page writer had filled its buffer
> with a certain amount of bytes. The idea of the Parquet patch was to make
> page writers flush to the column store upon the writer being given a certain
> number of rows - the default value is 20000.
> The patch makes the Spark Parquet Data Source erroneously write empty pages
> to column chunks, making the Parquet file ultimately unreadable with
> exceptions like these:
>
> {code:java}
> Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value
> at 0 in block -1 in file
> file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251)
> at
> org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
> at
> org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
> at
> org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
> ... 18 more
> Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking
> stream.
> at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53)
> at
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80)
> at
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62)
> at
> org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53)
> at
> org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733)
> at
> org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567)
> at
> org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30)
> at
> org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47)
> at
> org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84)
> at
> org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
> at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
> at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
> at
> org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165)
> at
> org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222)
> ... 22 more
> {code}
> What's happening here is that the reader is being given a page with no
> values, which Parquet can never handle.
> The root cause is due to the way Spark treats empty (null) records in
> optional fields. Concretely, in {{ParquetWriteSupport#write}}, we always
> indicate to the recordConsumer that we are starting a message
> ({{recordConsumer#startMessage}}). If Spark is given a null field in the row,
> it still indicates to the record consumer after having ignored the row that
> the message is finished ({{recordConsumer#endMessage}}). The ending of the
> message causes all column writers to increment their row count in the current
> page by 1, despite the fact that Spark is not necessarily sending records to
> the underlying page writer. Now suppose the page maximum row count is N; if
> Spark does the above N times in a page, and particularly if Spark cuts a page
> boundary and is subsequently given N empty values for an optional column -
> the column writer will then think it needs to flush the page to the column
> chunk store and will write out an empty page.
> This will most likely be manifested in very sparse columns.
> A simple demonstration of the issue is given below. Assume this code is
> manually inserted into {{ParquetIOSuite}}:
> {code:java}
> test("PARQUET-1414 Problems") {
> // Manually adjust the maximum row count to reproduce the issue on small
> data
> sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1")
> withTempPath { location =>
> val path = new Path(location.getCanonicalPath + "/parquet-data")
> val schema = StructType(
> Array(StructField("timestamps1", ArrayType(TimestampType))))
> val rows = ListBuffer[Row]()
> for (j <- 0 until 10) {
> rows += Row(
> null.asInstanceOf[Array[java.sql.Timestamp]])
> }
> val srcDf = spark.createDataFrame(
> sparkContext.parallelize(rows, 3),
> schema,
> true)
> srcDf.write.parquet(path.toString)
> assert(spark.read.parquet(path.toString).collect.size > 0)
> }
> }{code}
> Reverting the Parquet patch makes the above test pass.
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]