[ https://issues.apache.org/jira/browse/SPARK-26874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16767736#comment-16767736 ]
Matt Cheah commented on SPARK-26874: ------------------------------------ [~rdblue] [~cloud_fan] - was wondering if you had any thoughts here or can possibly confirm my understanding of what's going on. > When we upgrade Parquet to 1.11+, Spark can erroneously write empty pages > ------------------------------------------------------------------------- > > Key: SPARK-26874 > URL: https://issues.apache.org/jira/browse/SPARK-26874 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL > Affects Versions: 2.4.0 > Reporter: Matt Cheah > Priority: Major > > This issue will only come up when Spark upgrades its Parquet dependency to > the latest. This issue is being filed to proactively fix the bug before we > upgrade - it's not something that would easily be found in the current unit > tests and can be missed until the community scale tests in an e.g. RC phase. > Parquet introduced a new feature to limit the number of rows written to a > page in a column chunk - see PARQUET-1414. Previously, Parquet would only > flush pages to the column store after the page writer had filled its buffer > with a certain amount of bytes. The idea of the Parquet patch was to make > page writers flush to the column store upon the writer being given a certain > number of rows - the default value is 20000. > The patch makes the Spark Parquet Data Source erroneously write empty pages > to column chunks, making the Parquet file ultimately unreadable with > exceptions like these: > > {code:java} > Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value > at 0 in block -1 in file > file:/private/var/folders/7f/rrg37sj15r33cwlxbhg7fyg5080xxg/T/spark-2c1b1e5d-c132-42fe-98e1-b523b9baa176/parquet-data/part-00002-b8b4bb3c-b49f-440b-b96c-53fcd19786ad-c000.snappy.parquet > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:251) > at > org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207) > at > org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101) > at > org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181) > ... 18 more > Caused by: java.lang.IllegalArgumentException: Reading past RLE/BitPacking > stream. > at org.apache.parquet.Preconditions.checkArgument(Preconditions.java:53) > at > org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readNext(RunLengthBitPackingHybridDecoder.java:80) > at > org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder.readInt(RunLengthBitPackingHybridDecoder.java:62) > at > org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReader.readInteger(RunLengthBitPackingHybridValuesReader.java:53) > at > org.apache.parquet.column.impl.ColumnReaderBase$ValuesReaderIntIterator.nextInt(ColumnReaderBase.java:733) > at > org.apache.parquet.column.impl.ColumnReaderBase.checkRead(ColumnReaderBase.java:567) > at > org.apache.parquet.column.impl.ColumnReaderBase.consume(ColumnReaderBase.java:705) > at > org.apache.parquet.column.impl.ColumnReaderImpl.consume(ColumnReaderImpl.java:30) > at > org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:47) > at > org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:84) > at > org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147) > at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109) > at > org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:165) > at > org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:137) > at > org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:222) > ... 22 more > {code} > What's happening here is that the reader is being given a page with no > values, which Parquet can never handle. > The root cause is due to the way Spark treats empty (null) records in > optional fields. Concretely, in {{ParquetWriteSupport#write}}, we always > indicate to the recordConsumer that we are starting a message > ({{recordConsumer#startMessage}}). If Spark is given a null field in the row, > it still indicates to the record consumer after having written the row that > the message is finished ({{recordConsumer#endMessage}}). The ending of the > message causes all column writers to increment their row count in the current > page by 1, despite the fact that Spark is not necessarily sending records to > the underlying page writer. Now suppose the page maximum row count is N; if > Spark does the above N times in a page, and particularly if Spark cuts a page > boundary and is subsequently given N empty values for an optional column - > the column writer will then think it needs to flush the page to the column > chunk store and will write out an empty page. > This will most likely be manifested in very sparse columns. > A simple demonstration of the issue is given below. Assume this code is > manually inserted into \{{ParquetIOSuite}}: > {code:java} > test("PARQUET-1414 Problems") { > // Manually adjust the maximum row count to reproduce the issue on small > data > sparkContext.hadoopConfiguration.set("parquet.page.row.count.limit", "1") > withTempPath { location => > val path = new Path(location.getCanonicalPath + "/parquet-data") > val schema = StructType( > Array(StructField("timestamps1", ArrayType(TimestampType)))) > val rows = ListBuffer[Row]() > for (j <- 0 until 10) { > rows += Row( > null.asInstanceOf[Array[java.sql.Timestamp]]) > } > val srcDf = spark.createDataFrame( > sparkContext.parallelize(rows, 3), > schema, > true) > srcDf.write.parquet(path.toString) > assert(spark.read.parquet(path.toString).collect.size > 0) > } > }{code} > Reverting the Parquet patch makes the above test pass. > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org